diff --git a/docs/api/jobs.md b/docs/api/jobs.md index 8bf50f8..08fb759 100644 --- a/docs/api/jobs.md +++ b/docs/api/jobs.md @@ -20,7 +20,7 @@ Creates a new job and returns the job id. * **priority**, int optional priority. Higher numbers have, um, higher priority - + * **id**, uuid optional id. If not set, a uuid will automatically created @@ -41,6 +41,10 @@ Available in constructor as a default, or overridden in send. Default: false. Enables exponential backoff retries based on retryDelay instead of a fixed delay. Sets initial retryDelay to 1 if not set. +* **maxRetryDelay**, int + + Default: no limit. Maximum delay between retries of failed jobs, in seconds. Only used when retryBackoff is true. + **Expiration options** * **expireInSeconds**, number @@ -84,7 +88,7 @@ Available in constructor as a default, or overridden in send. **Connection options** * **db**, object - + Instead of using pg-boss's default adapter, you can use your own, as long as it implements the following interface (the same as the pg module). ```ts @@ -284,7 +288,7 @@ await Promise.allSettled(jobs.map(async job => { Deletes a job by id. -> Job deletion is offered if desired for a "fetch then delete" workflow similar to SQS. This is not the default behavior for workers so "everything just works" by default, including job throttling and debouncing, which requires jobs to exist to enforce a unique constraint. For example, if you are debouncing a queue to "only allow 1 job per hour", deleting jobs after processing would re-open that time slot, breaking your throttling policy. +> Job deletion is offered if desired for a "fetch then delete" workflow similar to SQS. This is not the default behavior for workers so "everything just works" by default, including job throttling and debouncing, which requires jobs to exist to enforce a unique constraint. For example, if you are debouncing a queue to "only allow 1 job per hour", deleting jobs after processing would re-open that time slot, breaking your throttling policy. ### `deleteJob(name, [ids], options)` @@ -298,7 +302,7 @@ Cancels a pending or active job. Cancels a set of pending or active jobs. -When passing an array of ids, it's possible that the operation may partially succeed based on the state of individual jobs requested. Consider this a best-effort attempt. +When passing an array of ids, it's possible that the operation may partially succeed based on the state of individual jobs requested. Consider this a best-effort attempt. ### `resume(name, id, options)` diff --git a/src/attorney.js b/src/attorney.js index 500b057..11020a7 100644 --- a/src/attorney.js +++ b/src/attorney.js @@ -276,10 +276,13 @@ function applyRetryConfig (config, defaults) { assert(!('retryDelay' in config) || (Number.isInteger(config.retryDelay) && config.retryDelay >= 0), 'retryDelay must be an integer >= 0') assert(!('retryLimit' in config) || (Number.isInteger(config.retryLimit) && config.retryLimit >= 0), 'retryLimit must be an integer >= 0') assert(!('retryBackoff' in config) || (config.retryBackoff === true || config.retryBackoff === false), 'retryBackoff must be either true or false') + assert(!('maxRetryDelay' in config) || config.maxRetryDelay === undefined || config.retryBackoff === true, 'maxRetryDelay can only be set if retryBackoff is true') + assert(!('maxRetryDelay' in config) || config.maxRetryDelay === undefined || (Number.isInteger(config.maxRetryDelay) && config.maxRetryDelay >= 0), 'maxRetryDelay must be an integer >= 0') config.retryDelayDefault = defaults?.retryDelay config.retryLimitDefault = defaults?.retryLimit config.retryBackoffDefault = defaults?.retryBackoff + config.maxRetryDelayDefault = defaults?.maxRetryDelay } function applyPollingInterval (config, defaults) { diff --git a/src/manager.js b/src/manager.js index bea0e41..dbd081b 100644 --- a/src/manager.js +++ b/src/manager.js @@ -347,7 +347,9 @@ class Manager extends EventEmitter { retryDelay, retryDelayDefault, retryBackoff, - retryBackoffDefault + retryBackoffDefault, + maxRetryDelay, + maxRetryDelayDefault } = options const values = [ @@ -369,7 +371,9 @@ class Manager extends EventEmitter { retryDelay, // 16 retryDelayDefault, // 17 retryBackoff, // 18 - retryBackoffDefault // 19 + retryBackoffDefault, // 19 + maxRetryDelay, // 20 + maxRetryDelayDefault // 21 ] const db = wrapper || this.db @@ -405,7 +409,8 @@ class Manager extends EventEmitter { this.config.keepUntil, // 3 this.config.retryLimit, // 4 this.config.retryDelay, // 5 - this.config.retryBackoff // 6 + this.config.retryBackoff, // 6 + this.config.maxRetryDelay // 7 ] const { rows } = await db.executeSql(this.insertJobsCommand, params) @@ -529,6 +534,7 @@ class Manager extends EventEmitter { retryLimit, retryDelay, retryBackoff, + maxRetryDelay, expireInSeconds, retentionMinutes, deadLetter @@ -544,6 +550,7 @@ class Manager extends EventEmitter { retryLimit, retryDelay, retryBackoff, + maxRetryDelay, expireInSeconds, retentionMinutes, deadLetter @@ -568,20 +575,22 @@ class Manager extends EventEmitter { retryLimit, retryDelay, retryBackoff, + maxRetryDelay, expireInSeconds, retentionMinutes, deadLetter } = Attorney.checkQueueArgs(name, options) const params = [ - name, - policy, - retryLimit, - retryDelay, - retryBackoff, - expireInSeconds, - retentionMinutes, - deadLetter + name, // 1 + policy, // 2 + retryLimit, // 3 + retryDelay, // 4 + retryBackoff, // 5 + maxRetryDelay, // 6 + expireInSeconds, // 7 + retentionMinutes, // 8 + deadLetter // 9 ] await this.db.executeSql(this.updateQueueCommand, params) diff --git a/src/migrationStore.js b/src/migrationStore.js index b2e4e2a..dcbd362 100644 --- a/src/migrationStore.js +++ b/src/migrationStore.js @@ -64,6 +64,159 @@ function migrate (value, version, migrations) { function getAll (schema) { return [ + { + release: '10.2.0', + version: 25, + previous: 24, + install: [ + `ALTER TABLE ${schema}.queue + ADD COLUMN IF NOT EXISTS max_retry_delay INTEGER` + ], + uninstall: [ + `ALTER TABLE ${schema}.queue + DROP COLUMN IF EXISTS max_retry_delay` + ] + }, + { + release: '10.2.0', + version: 25, + previous: 24, + install: [ + `ALTER TABLE ${schema}.job + ADD COLUMN IF NOT EXISTS max_retry_delay INTEGER` + ], + uninstall: [ + `ALTER TABLE ${schema}.job + DROP COLUMN IF EXISTS max_retry_delay` + ] + }, + { + release: '10.2.0', + version: 25, + previous: 24, + install: [ + ` + CREATE OR REPLACE FUNCTION ${schema}.create_queue(queue_name text, options json) + RETURNS VOID AS + $$ + DECLARE + table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex'); + queue_created_on timestamptz; + BEGIN + + WITH q as ( + INSERT INTO ${schema}.queue ( + name, + policy, + retry_limit, + retry_delay, + retry_backoff, + max_retry_delay, + expire_seconds, + retention_minutes, + dead_letter, + partition_name + ) + VALUES ( + queue_name, + options->>'policy', + (options->>'retryLimit')::int, + (options->>'retryDelay')::int, + (options->>'retryBackoff')::bool, + (options->>'maxRetryDelay')::int, + (options->>'expireInSeconds')::int, + (options->>'retentionMinutes')::int, + options->>'deadLetter', + table_name + ) + ON CONFLICT DO NOTHING + RETURNING created_on + ) + SELECT created_on into queue_created_on from q; + + IF queue_created_on IS NULL THEN + RETURN; + END IF; + + EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name); + + EXECUTE format('ALTER TABLE ${schema}.%1$I ADD PRIMARY KEY (name, id)', table_name); + EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name); + EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i1 ON ${schema}.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''created'' AND policy = ''short''', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i2 ON ${schema}.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''active'' AND policy = ''singleton''', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i3 ON ${schema}.%1$I (name, state, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''stately''', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i4 ON ${schema}.%1$I (name, singleton_on, COALESCE(singleton_key, '''')) WHERE state <> ''cancelled'' AND singleton_on IS NOT NULL', table_name); + EXECUTE format('CREATE INDEX %1$s_i5 ON ${schema}.%1$I (name, start_after) INCLUDE (priority, created_on, id) WHERE state < ''active''', table_name); + + EXECUTE format('ALTER TABLE ${schema}.%I ADD CONSTRAINT cjc CHECK (name=%L)', table_name, queue_name); + EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION ${schema}.%I FOR VALUES IN (%L)', table_name, queue_name); + END; + $$ + LANGUAGE plpgsql + ` + ], + uninstall: [ + ` + CREATE OR REPLACE FUNCTION ${schema}.create_queue(queue_name text, options json) + RETURNS VOID AS + $$ + DECLARE + table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex'); + queue_created_on timestamptz; + BEGIN + + WITH q as ( + INSERT INTO ${schema}.queue ( + name, + policy, + retry_limit, + retry_delay, + retry_backoff, + expire_seconds, + retention_minutes, + dead_letter, + partition_name + ) + VALUES ( + queue_name, + options->>'policy', + (options->>'retryLimit')::int, + (options->>'retryDelay')::int, + (options->>'retryBackoff')::bool, + (options->>'expireInSeconds')::int, + (options->>'retentionMinutes')::int, + options->>'deadLetter', + table_name + ) + ON CONFLICT DO NOTHING + RETURNING created_on + ) + SELECT created_on into queue_created_on from q; + + IF queue_created_on IS NULL THEN + RETURN; + END IF; + + EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name); + + EXECUTE format('ALTER TABLE ${schema}.%1$I ADD PRIMARY KEY (name, id)', table_name); + EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name); + EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i1 ON ${schema}.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''created'' AND policy = ''short''', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i2 ON ${schema}.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''active'' AND policy = ''singleton''', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i3 ON ${schema}.%1$I (name, state, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''stately''', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i4 ON ${schema}.%1$I (name, singleton_on, COALESCE(singleton_key, '''')) WHERE state <> ''cancelled'' AND singleton_on IS NOT NULL', table_name); + EXECUTE format('CREATE INDEX %1$s_i5 ON ${schema}.%1$I (name, start_after) INCLUDE (priority, created_on, id) WHERE state < ''active''', table_name); + + EXECUTE format('ALTER TABLE ${schema}.%I ADD CONSTRAINT cjc CHECK (name=%L)', table_name, queue_name); + EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION ${schema}.%I FOR VALUES IN (%L)', table_name, queue_name); + END; + $$ + LANGUAGE plpgsql + ` + ] + }, { release: '10.1.5', version: 24, @@ -111,7 +264,7 @@ function getAll (schema) { END IF; EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name); - + EXECUTE format('ALTER TABLE ${schema}.%1$I ADD PRIMARY KEY (name, id)', table_name); EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name); EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name); diff --git a/src/plans.js b/src/plans.js index 8489ff8..5588969 100644 --- a/src/plans.js +++ b/src/plans.js @@ -134,13 +134,14 @@ function createTableQueue (schema) { retry_limit int, retry_delay int, retry_backoff bool, + max_retry_delay int, expire_seconds int, retention_minutes int, dead_letter text REFERENCES ${schema}.queue (name), partition_name text, created_on timestamp with time zone not null default now(), updated_on timestamp with time zone not null default now(), - PRIMARY KEY (name) + PRIMARY KEY (name) ) ` } @@ -184,6 +185,7 @@ function createTableJob (schema) { retry_count integer not null default(0), retry_delay integer not null default(0), retry_backoff boolean not null default false, + max_retry_delay integer, start_after timestamp with time zone not null default now(), started_on timestamp with time zone, singleton_key text, @@ -208,7 +210,8 @@ const allJobColumns = `${baseJobColumns}, retry_count as "retryCount", retry_delay as "retryDelay", retry_backoff as "retryBackoff", - start_after as "startAfter", + max_retry_delay as "maxRetryDelay", + start_after as "startAfter", started_on as "startedOn", singleton_key as "singletonKey", singleton_on as "singletonOn", @@ -237,6 +240,7 @@ function createQueueFunction (schema) { retry_limit, retry_delay, retry_backoff, + max_retry_delay, expire_seconds, retention_minutes, dead_letter, @@ -248,6 +252,7 @@ function createQueueFunction (schema) { (options->>'retryLimit')::int, (options->>'retryDelay')::int, (options->>'retryBackoff')::bool, + (options->>'maxRetryDelay')::int, (options->>'expireInSeconds')::int, (options->>'retentionMinutes')::int, options->>'deadLetter', @@ -263,7 +268,7 @@ function createQueueFunction (schema) { END IF; EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name); - + EXECUTE format('${formatPartitionCommand(createPrimaryKeyJob(schema))}', table_name); EXECUTE format('${formatPartitionCommand(createQueueForeignKeyJob(schema))}', table_name); EXECUTE format('${formatPartitionCommand(createQueueForeignKeyJobDeadLetter(schema))}', table_name); @@ -292,7 +297,7 @@ function deleteQueueFunction (schema) { $$ DECLARE table_name varchar; - BEGIN + BEGIN WITH deleted as ( DELETE FROM ${schema}.queue WHERE name = queue_name @@ -390,9 +395,10 @@ function updateQueue (schema) { retry_limit = COALESCE($3, retry_limit), retry_delay = COALESCE($4, retry_delay), retry_backoff = COALESCE($5, retry_backoff), - expire_seconds = COALESCE($6, expire_seconds), - retention_minutes = COALESCE($7, retention_minutes), - dead_letter = COALESCE($8, dead_letter), + max_retry_delay = $6, + expire_seconds = COALESCE($7, expire_seconds), + retention_minutes = COALESCE($8, retention_minutes), + dead_letter = COALESCE($9, dead_letter), updated_on = now() WHERE name = $1 ` @@ -400,12 +406,13 @@ function updateQueue (schema) { function getQueues (schema) { return ` - SELECT + SELECT name, policy, retry_limit as "retryLimit", retry_delay as "retryDelay", retry_backoff as "retryBackoff", + max_retry_delay as "maxRetryDelay", expire_seconds as "expireInSeconds", retention_minutes as "retentionMinutes", dead_letter as "deadLetter", @@ -520,7 +527,7 @@ function fetchNextJob (schema) { retry_count = CASE WHEN started_on IS NOT NULL THEN retry_count + 1 ELSE retry_count END FROM next WHERE name = $1 AND j.id = next.id - RETURNING j.${includeMetadata ? allJobColumns : baseJobColumns} + RETURNING j.${includeMetadata ? allJobColumns : baseJobColumns} ` } @@ -571,6 +578,7 @@ function failJobs (schema, where, output) { retry_count, retry_delay, retry_backoff, + max_retry_delay, start_after, started_on, singleton_key, @@ -596,12 +604,17 @@ function failJobs (schema, where, output) { retry_count, retry_delay, retry_backoff, + max_retry_delay, CASE WHEN retry_count = retry_limit THEN start_after WHEN NOT retry_backoff THEN now() + retry_delay * interval '1' - ELSE now() + ( + ELSE now() + + LEAST( + ( retry_delay * 2 ^ LEAST(16, retry_count + 1) / 2 + retry_delay * 2 ^ LEAST(16, retry_count + 1) / 2 * random() + ), + max_retry_delay ) * interval '1' END as start_after, started_on, @@ -615,7 +628,7 @@ function failJobs (schema, where, output) { END as completed_on, keep_until, dead_letter, - policy, + policy, ${output} FROM deleted_jobs ON CONFLICT DO NOTHING @@ -632,6 +645,7 @@ function failJobs (schema, where, output) { retry_count, retry_delay, retry_backoff, + max_retry_delay, start_after, started_on, singleton_key, @@ -654,6 +668,7 @@ function failJobs (schema, where, output) { retry_count, retry_delay, retry_backoff, + max_retry_delay, start_after, started_on, singleton_key, @@ -726,7 +741,7 @@ function deleteJobs (schema) { with results as ( DELETE FROM ${schema}.job WHERE name = $1 - AND id IN (SELECT UNNEST($2::uuid[])) + AND id IN (SELECT UNNEST($2::uuid[])) RETURNING 1 ) SELECT COUNT(*) from results @@ -749,6 +764,7 @@ function insertJob (schema) { retry_limit, retry_delay, retry_backoff, + max_retry_delay, policy ) SELECT @@ -777,6 +793,11 @@ function insertJob (schema) { ELSE COALESCE(j.retry_delay, q.retry_delay, retry_delay_default, 0) END as retry_delay, COALESCE(j.retry_backoff, q.retry_backoff, retry_backoff_default, false) as retry_backoff, + CASE + WHEN COALESCE(j.retry_backoff, q.retry_backoff, retry_backoff_default, false) + THEN COALESCE(j.max_retry_delay, q.max_retry_delay, max_retry_delay_default) + ELSE NULL + END as max_retry_delay, q.policy FROM ( SELECT @@ -803,7 +824,9 @@ function insertJob (schema) { $16::int as retry_delay, $17::int as retry_delay_default, $18::bool as retry_backoff, - $19::bool as retry_backoff_default + $19::bool as retry_backoff_default, + $20::int as max_retry_delay, + $21::int as max_retry_delay_default ) j JOIN ${schema}.queue q ON j.name = q.name ON CONFLICT DO NOTHING RETURNING id @@ -813,12 +836,13 @@ function insertJob (schema) { function insertJobs (schema) { return ` WITH defaults as ( - SELECT + SELECT $2 as expire_in, $3 as keep_until, $4::int as retry_limit, $5::int as retry_delay, - $6::bool as retry_backoff + $6::bool as retry_backoff, + $7::int as max_retry_delay ) INSERT INTO ${schema}.job ( id, @@ -834,6 +858,7 @@ function insertJobs (schema) { retry_limit, retry_delay, retry_backoff, + max_retry_delay, policy ) SELECT @@ -863,8 +888,13 @@ function insertJobs (schema) { WHEN COALESCE("retryBackoff", q.retry_backoff, defaults.retry_backoff, false) THEN GREATEST(COALESCE("retryDelay", q.retry_delay, defaults.retry_delay), 1) ELSE COALESCE("retryDelay", q.retry_delay, defaults.retry_delay, 0) - END as retry_delay, + END as retry_delay, COALESCE("retryBackoff", q.retry_backoff, defaults.retry_backoff, false) as retry_backoff, + CASE + WHEN COALESCE("retryBackoff", q.retry_backoff, defaults.retry_backoff, false) + THEN COALESCE("maxRetryDelay", q.max_retry_delay, defaults.max_retry_delay) + ELSE NULL + END as max_retry_delay, q.policy FROM ( SELECT *, @@ -881,12 +911,13 @@ function insertJobs (schema) { "retryLimit" integer, "retryDelay" integer, "retryBackoff" boolean, + "maxRetryDelay" integer, "singletonKey" text, "singletonSeconds" integer, "expireInSeconds" integer, "keepUntil" timestamp with time zone, "deadLetter" text - ) + ) ) j JOIN ${schema}.queue q ON j.name = q.name, defaults @@ -902,7 +933,7 @@ function drop (schema, interval) { } function archive (schema, completedInterval, failedInterval = completedInterval) { - const columns = 'id, name, priority, data, state, retry_limit, retry_count, retry_delay, retry_backoff, start_after, started_on, singleton_key, singleton_on, expire_in, created_on, completed_on, keep_until, dead_letter, policy, output' + const columns = 'id, name, priority, data, state, retry_limit, retry_count, retry_delay, retry_backoff, max_retry_delay, start_after, started_on, singleton_key, singleton_on, expire_in, created_on, completed_on, keep_until, dead_letter, policy, output' return ` WITH archived_rows AS ( diff --git a/test/fetchTest.js b/test/fetchTest.js index 914bf2c..8581530 100644 --- a/test/fetchTest.js +++ b/test/fetchTest.js @@ -58,6 +58,7 @@ describe('fetch', function () { assert(job.retryCount === 0) assert(job.retryDelay === 0) assert(job.retryBackoff === false) + assert(job.maxRetryDelay === null) assert(job.startAfter !== undefined) assert(job.startedOn !== undefined) assert(job.singletonKey === null) @@ -92,6 +93,70 @@ describe('fetch', function () { assert(job.retryCount === 0) assert(job.retryDelay === 0) assert(job.retryBackoff === false) + assert(job.maxRetryDelay === null) + assert(job.startAfter !== undefined) + assert(job.startedOn !== undefined) + assert(job.singletonKey === null) + assert(job.singletonOn === null) + assert(job.expireIn.minutes === 15) + assert(job.createdOn !== undefined) + assert(job.completedOn === null) + assert(job.keepUntil !== undefined) + } + }) + + it('should fetch all metadata for a single job with exponential backoff when requested', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + + await boss.send(queue, null, { retryDelay: 1, retryBackoff: true, maxRetryDelay: 10 }) + const [job] = await boss.fetch(queue, { includeMetadata: true }) + + assert(queue === job.name) + assert(job.priority === 0) + assert(job.state === 'active') + assert(job.policy !== undefined) + assert(job.retryLimit === 0) + assert(job.retryCount === 0) + assert(job.retryDelay === 1) + assert(job.retryBackoff === true) + assert(job.maxRetryDelay === 10) + assert(job.startAfter !== undefined) + assert(job.startedOn !== undefined) + assert(job.singletonKey === null) + assert(job.singletonOn === null) + assert(job.expireIn.minutes === 15) + assert(job.createdOn !== undefined) + assert(job.completedOn === null) + assert(job.keepUntil !== undefined) + }) + + it('should fetch all metadata for a batch of jobs with exponential backoff when requested', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + const options = { retryDelay: 1, retryBackoff: true, maxRetryDelay: 10 } + const batchSize = 4 + + await Promise.all([ + boss.send(queue, null, options), + boss.send(queue, null, options), + boss.send(queue, null, options), + boss.send(queue, null, options) + ]) + + const jobs = await boss.fetch(queue, { batchSize, includeMetadata: true }) + assert(jobs.length === batchSize) + + for (const job of jobs) { + assert(queue === job.name) + assert(job.priority === 0) + assert(job.state === 'active') + assert(job.policy !== undefined) + assert(job.retryLimit === 0) + assert(job.retryCount === 0) + assert(job.retryDelay === 1) + assert(job.retryBackoff === true) + assert(job.maxRetryDelay === 10) assert(job.startAfter !== undefined) assert(job.startedOn !== undefined) assert(job.singletonKey === null) diff --git a/test/insertTest.js b/test/insertTest.js index e83296f..3959d59 100644 --- a/test/insertTest.js +++ b/test/insertTest.js @@ -31,6 +31,7 @@ describe('insert', function () { retryLimit: 1, retryDelay: 2, retryBackoff: true, + maxRetryDelay: 3, startAfter: new Date().toISOString(), expireInSeconds: 5, singletonKey: '123', @@ -49,6 +50,7 @@ describe('insert', function () { assert.strictEqual(job.retryLimit, input.retryLimit, `retryLimit input ${input.retryLimit} didn't match job ${job.retryLimit}`) assert.strictEqual(job.retryDelay, input.retryDelay, `retryDelay input ${input.retryDelay} didn't match job ${job.retryDelay}`) assert.strictEqual(job.retryBackoff, input.retryBackoff, `retryBackoff input ${input.retryBackoff} didn't match job ${job.retryBackoff}`) + assert.strictEqual(job.maxRetryDelay, input.maxRetryDelay, `maxRetryDelay input ${input.maxRetryDelay} didn't match job ${job.maxRetryDelay}`) assert.strictEqual(new Date(job.startAfter).toISOString(), input.startAfter, `startAfter input ${input.startAfter} didn't match job ${job.startAfter}`) assert.strictEqual(job.expireIn.seconds, input.expireInSeconds, `expireInSeconds input ${input.expireInSeconds} didn't match job ${job.expireIn}`) assert.strictEqual(job.singletonKey, input.singletonKey, `name input ${input.singletonKey} didn't match job ${job.singletonKey}`) @@ -71,6 +73,7 @@ describe('insert', function () { retryLimit: 1, retryDelay: 2, retryBackoff: true, + maxRetryDelay: 3, startAfter: new Date().toISOString(), expireInSeconds: 5, singletonKey: '123', @@ -99,6 +102,7 @@ describe('insert', function () { assert.strictEqual(job.retryLimit, input.retryLimit, `retryLimit input ${input.retryLimit} didn't match job ${job.retryLimit}`) assert.strictEqual(job.retryDelay, input.retryDelay, `retryDelay input ${input.retryDelay} didn't match job ${job.retryDelay}`) assert.strictEqual(job.retryBackoff, input.retryBackoff, `retryBackoff input ${input.retryBackoff} didn't match job ${job.retryBackoff}`) + assert.strictEqual(job.maxRetryDelay, input.maxRetryDelay, `maxRetryDelay input ${input.maxRetryDelay} didn't match job ${job.maxRetryDelay}`) assert.strictEqual(new Date(job.startAfter).toISOString(), input.startAfter, `startAfter input ${input.startAfter} didn't match job ${job.startAfter}`) assert.strictEqual(job.expireIn.seconds, input.expireInSeconds, `expireInSeconds input ${input.expireInSeconds} didn't match job ${job.expireIn}`) assert.strictEqual(job.singletonKey, input.singletonKey, `name input ${input.singletonKey} didn't match job ${job.singletonKey}`) diff --git a/test/queueTest.js b/test/queueTest.js index ce1bdfa..b932b0a 100644 --- a/test/queueTest.js +++ b/test/queueTest.js @@ -131,10 +131,11 @@ describe('queues', function () { const createProps = { policy: 'standard', retryLimit: 1, - retryBackoff: false, - retryDelay: 1, - expireInSeconds: 1, - retentionMinutes: 1, + retryBackoff: true, + retryDelay: 2, + maxRetryDelay: 3, + expireInSeconds: 4, + retentionMinutes: 5, deadLetter } @@ -147,6 +148,7 @@ describe('queues', function () { assert.strictEqual(createProps.retryLimit, queueObj.retryLimit) assert.strictEqual(createProps.retryBackoff, queueObj.retryBackoff) assert.strictEqual(createProps.retryDelay, queueObj.retryDelay) + assert.strictEqual(createProps.maxRetryDelay, queueObj.maxRetryDelay) assert.strictEqual(createProps.expireInSeconds, queueObj.expireInSeconds) assert.strictEqual(createProps.retentionMinutes, queueObj.retentionMinutes) assert.strictEqual(createProps.deadLetter, queueObj.deadLetter) @@ -158,11 +160,12 @@ describe('queues', function () { const updateProps = { policy: 'short', - retryLimit: 2, - retryBackoff: true, + retryLimit: 1, + retryBackoff: false, retryDelay: 2, - expireInSeconds: 2, - retentionMinutes: 2, + maxRetryDelay: undefined, + expireInSeconds: 4, + retentionMinutes: 5, deadLetter } @@ -174,6 +177,7 @@ describe('queues', function () { assert.strictEqual(updateProps.retryLimit, queueObj.retryLimit) assert.strictEqual(updateProps.retryBackoff, queueObj.retryBackoff) assert.strictEqual(updateProps.retryDelay, queueObj.retryDelay) + assert.strictEqual(null, queueObj.maxRetryDelay) assert.strictEqual(updateProps.expireInSeconds, queueObj.expireInSeconds) assert.strictEqual(updateProps.retentionMinutes, queueObj.retentionMinutes) assert.strictEqual(updateProps.deadLetter, queueObj.deadLetter) @@ -190,8 +194,9 @@ describe('queues', function () { retryLimit: 1, retryBackoff: true, retryDelay: 2, - expireInSeconds: 3, - retentionMinutes: 4, + maxRetryDelay: 3, + expireInSeconds: 4, + retentionMinutes: 5, deadLetter } @@ -206,6 +211,7 @@ describe('queues', function () { assert.strictEqual(createProps.retryLimit, job.retryLimit) assert.strictEqual(createProps.retryBackoff, job.retryBackoff) assert.strictEqual(createProps.retryDelay, job.retryDelay) + assert.strictEqual(createProps.maxRetryDelay, job.maxRetryDelay) assert.strictEqual(createProps.expireInSeconds, job.expireIn.seconds) assert.strictEqual(createProps.retentionMinutes, retentionMinutes) assert.strictEqual(createProps.deadLetter, job.deadLetter) diff --git a/test/retryTest.js b/test/retryTest.js index abadbed..d111c6f 100644 --- a/test/retryTest.js +++ b/test/retryTest.js @@ -86,4 +86,38 @@ describe('retries', function () { assert(processCount < retryLimit) }) + + it('should limit retry delay with exponential backoff', async function () { + const boss = this.test.boss = await helper.start({ ...this.test.bossConfig }) + const queue = this.test.bossConfig.schema + + const retryLimit = 4 + + const startAfters = [] + + await boss.work(queue, { pollingIntervalSeconds: 1, includeMetadata: true }, async (jobs) => { + startAfters.push(jobs[0].startAfter) + throw new Error('retry') + }) + + await boss.send(queue, null, { + retryLimit, + retryDelay: 1, + retryBackoff: true, + maxRetryDelay: 3 + }) + + await delay(12000) + + const actualDelays = startAfters.map((startAfter, index) => { + if (index === 0) { + return 0 + } + return (startAfter - startAfters[index - 1]) / 1000 + }) + + for (const d of actualDelays) { + assert(d < 4, `Expected delay to be less than 4 seconds, but got ${d}`) + } + }).timeout(15000) }) diff --git a/types.d.ts b/types.d.ts index 69c6680..b37fdaa 100644 --- a/types.d.ts +++ b/types.d.ts @@ -93,6 +93,7 @@ declare namespace PgBoss { retryLimit?: number; retryDelay?: number; retryBackoff?: boolean; + maxRetryDelay?: number; } interface JobOptions { @@ -185,6 +186,7 @@ declare namespace PgBoss { retryCount: number; retryDelay: number; retryBackoff: boolean; + maxRetryDelay?: number; startAfter: Date; startedOn: Date; singletonKey: string | null; @@ -206,6 +208,7 @@ declare namespace PgBoss { retryLimit?: number; retryDelay?: number; retryBackoff?: boolean; + maxRetryDelay?: number; startAfter?: Date | string; singletonKey?: string; singletonSeconds?: number; diff --git a/version.json b/version.json index bbdad9b..fa75247 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "schema": 24 + "schema": 25 }