From 3044aef9d9089b64c6590b16a04ad31c030ec792 Mon Sep 17 00:00:00 2001 From: Tim Jones Date: Fri, 18 Jun 2021 12:40:01 -0500 Subject: [PATCH] existing schema + interval fixes (#256) * interval fixes * release notes * release notes * update version * docs * simplify graceful shutdown * unsubscribe cron during stop * clear rejection timeout after race * docs * versioning update * update changelog [skip ci] * include interval bypass even if metadata is requested --- CHANGELOG.md | 6 +++++ docs/usage.md | 14 ++++++++++- package-lock.json | 4 +-- package.json | 2 +- src/index.js | 56 +++++++++++++++++++---------------------- src/manager.js | 51 +++++++++++++++++++++---------------- src/plans.js | 22 +--------------- src/timekeeper.js | 47 ++++++++++++++++++---------------- test/delayTest.js | 24 ++++++------------ test/migrationTest.js | 4 +-- test/multiMasterTest.js | 8 +++--- test/opsTest.js | 4 +-- test/scheduleTest.js | 10 +++----- test/subscribeTest.js | 8 +++--- 14 files changed, 129 insertions(+), 131 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 47edcb80..84f30f12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changes +## 6.1.0 + +- Existing schemas can now be used via the `schema` property in the constructor. +- Fixed expiration rejection in subscriptions when the pg driver wasn't returning an interval object. +- Removed timers causing process to hang during shut down + ## 6.0.1 - Typescript types updated for `stop()`. PR from @stnwk diff --git a/docs/usage.md b/docs/usage.md index a98f4576..ca2a3759 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -90,6 +90,8 @@ GRANT CREATE ON DATABASE db1 TO leastprivuser; If the CREATE privilege is not available or desired, you can use the included [static functions](#static-functions) to export the SQL commands to manually create or upgrade the required database schema. **This means you will also need to monitor future releases for schema changes** (the schema property in [version.json](../version.json)) so they can be applied manually. +NOTE: Using an existing schema is supported for advanced use cases **but discouraged**, as this opens up the possibility that creation will fail on an object name collision, and it will add more steps to the uninstallation process. + # Database uninstall If you need to uninstall pg-boss from a database, just run the following command. @@ -98,7 +100,17 @@ If you need to uninstall pg-boss from a database, just run the following command DROP SCHEMA $1 CASCADE ``` -Where `$1` is the name of your schema if you've customized it. Otherwise, the default schema is `pgboss`. +Where `$1` is the name of your schema if you've customized it. Otherwise, the default schema is `pgboss`. + +NOTE: If an existing schema was used during installation, created objects will need to be removed manually using the following commands. + +```sql +DROP TABLE ${schema}.archive; +DROP TABLE ${schema}.job; +DROP TABLE ${schema}.schedule; +DROP TABLE ${schema}.version; +DROP TYPE ${schema}.job_state; +``` # Direct database interactions diff --git a/package-lock.json b/package-lock.json index 377e7e25..2c115342 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,11 +1,11 @@ { "name": "pg-boss", - "version": "6.0.0", + "version": "6.1.0", "lockfileVersion": 2, "requires": true, "packages": { "": { - "version": "6.0.0", + "version": "6.1.0", "license": "MIT", "dependencies": { "cron-parser": "^3.3.0", diff --git a/package.json b/package.json index 4c6533e6..a8e29892 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "pg-boss", - "version": "6.0.1", + "version": "6.1.0", "description": "Queueing jobs in Node.js using PostgreSQL like a boss", "main": "./src/index.js", "engines": { diff --git a/src/index.js b/src/index.js index 83552c9c..3a362d05 100644 --- a/src/index.js +++ b/src/index.js @@ -131,49 +131,45 @@ class PgBoss extends EventEmitter { await this.manager.stop() await this.timekeeper.stop() - let polling = false - const shutdown = async () => { - try { - await this.boss.stop() + if (this.db.isOurs) { + await this.db.close() + } - if (this.db.isOurs) { - await this.db.close() - } - } catch (err) { - if (polling) { - this.emit(events.error, err) - } else { - throw err - } - } finally { - this.stopped = true - this.stoppingOn = null + this.stopped = true + this.stoppingOn = null - this.emit(events.stopped) - } + this.emit(events.stopped) } if (!graceful) { - return await shutdown() + await this.boss.stop() + await shutdown() + return } - if (this.manager.getWipData().length === 0) { - return await shutdown() - } + setImmediate(async () => { + let closing = false - polling = true + try { + while (Date.now() - this.stoppingOn < timeout) { + if (this.manager.getWipData({ includeInternal: closing }).length === 0) { + if (closing) { + break + } - setImmediate(async () => { - while (Date.now() - this.stoppingOn < timeout) { - await delay(1000) + closing = true + + await this.boss.stop() + } - if (this.manager.getWipData().length === 0) { - return await shutdown() + await delay(1000) } - } - await shutdown() + await shutdown() + } catch (err) { + this.emit(events.error, err) + } }) } } diff --git a/src/manager.js b/src/manager.js index 88a26d6e..c407f5e9 100644 --- a/src/manager.js +++ b/src/manager.js @@ -111,7 +111,9 @@ class Manager extends EventEmitter { } } - getWipData () { + getWipData (options = {}) { + const { includeInternal = false } = options + const data = this.getWorkers() .map(({ id, @@ -138,7 +140,7 @@ class Manager extends EventEmitter { lastError, lastErrorOn })) - .filter(i => i.count > 0 && !INTERNAL_QUEUES[i.name]) + .filter(i => i.count > 0 && (!INTERNAL_QUEUES[i.name] || includeInternal)) return data } @@ -165,24 +167,32 @@ class Manager extends EventEmitter { throw new Error('__test__throw_subscription') } - const expirationRace = (promise, timeout) => Promise.race([ - promise, - delay.reject(timeout, { value: new Error(`handler execution exceeded ${timeout}ms`) }) - ]) + const resolveWithinSeconds = async (promise, seconds) => { + const timeout = Math.max(1, seconds) * 1000 + const reject = delay.reject(timeout, { value: new Error(`handler execution exceeded ${timeout}ms`) }) + + const result = await Promise.race([promise, reject]) + + try { + reject.clear() + } catch {} + + return result + } this.emitWip(name) let result if (batchSize) { - const maxTimeout = jobs.reduce((acc, i) => Math.max(acc, plans.intervalToMs(i.expirein)), 0) + const maxExpiration = jobs.reduce((acc, i) => Math.max(acc, i.expire_in_seconds), 0) // Failing will fail all fetched jobs - result = await expirationRace(Promise.all([callback(jobs)]), maxTimeout) + result = await resolveWithinSeconds(Promise.all([callback(jobs)]), maxExpiration) .catch(err => this.fail(jobs.map(job => job.id), err)) } else { result = await pMap(jobs, job => - expirationRace(callback(job), plans.intervalToMs(job.expirein)) + resolveWithinSeconds(callback(job), job.expire_in_seconds) .then(result => this.complete(job.id, result)) .catch(err => this.fail(job.id, err)) , { concurrency: teamConcurrency } @@ -228,13 +238,15 @@ class Manager extends EventEmitter { worker.stop() } - setInterval(() => { - if (workers.every(w => w.stopped)) { - for (const worker of workers) { - this.removeWorker(worker) - } + setImmediate(async () => { + while (!workers.every(w => w.stopped)) { + await delay(1000) + } + + for (const worker of workers) { + this.removeWorker(worker) } - }, 1000) + }) } async offComplete (value) { @@ -374,12 +386,13 @@ class Manager extends EventEmitter { async fetch (name, batchSize, options = {}) { const values = Attorney.checkFetchArgs(name, batchSize, options) + const result = await this.db.executeSql( this.nextJobCommand(options.includeMetadata || false), [values.name, batchSize || 1] ) - if (!result) { + if (!result || result.rows.length === 0) { return null } @@ -394,11 +407,7 @@ class Manager extends EventEmitter { return job }) - return jobs.length === 0 - ? null - : jobs.length === 1 && !batchSize - ? jobs[0] - : jobs + return jobs.length === 1 && !batchSize ? jobs[0] : jobs } async fetchCompleted (name, batchSize, options = {}) { diff --git a/src/plans.js b/src/plans.js index e7cf7531..c455d486 100644 --- a/src/plans.js +++ b/src/plans.js @@ -18,20 +18,6 @@ const MUTEX = 1337968055000 const MIGRATE_RACE_MESSAGE = 'division by zero' const CREATE_RACE_MESSAGE = 'already exists' -const SECOND = 1000 -const MINUTE = SECOND * 60 -const HOUR = MINUTE * 60 -const DAY = HOUR * 24 - -// source: pg.types -> postgres-interval -const INTERVAL_TO_MS_MAP = { - days: DAY, - hours: HOUR, - minutes: MINUTE, - seconds: SECOND, - milliseconds: 1 -} - module.exports = { create, insertVersion, @@ -61,7 +47,6 @@ module.exports = { setCronTime, locked, assertMigration, - intervalToMs, getArchivedJobById, getJobById, states: { ...states }, @@ -72,11 +57,6 @@ module.exports = { DEFAULT_SCHEMA } -function intervalToMs (interval) { - const ms = Object.keys(interval).reduce((total, key) => total + INTERVAL_TO_MS_MAP[key] * interval[key], 0) - return ms -} - function locked (query) { if (Array.isArray(query)) { query = query.join(';\n') @@ -344,7 +324,7 @@ function fetchNextJob (schema) { retryCount = CASE WHEN state = '${states.retry}' THEN retryCount + 1 ELSE retryCount END FROM nextJob WHERE j.id = nextJob.id - RETURNING ${includeMetadata ? 'j.*' : 'j.id, name, data, expireIn'} + RETURNING ${includeMetadata ? 'j.*' : 'j.id, name, data'}, EXTRACT(epoch FROM expireIn) as expire_in_seconds ` } diff --git a/src/timekeeper.js b/src/timekeeper.js index 834bf6b5..ca69a24e 100644 --- a/src/timekeeper.js +++ b/src/timekeeper.js @@ -39,34 +39,46 @@ class Timekeeper extends EventEmitter { this.unschedule, this.getSchedules ] + + this.stopped = true } async start () { + if (this.config.archiveSeconds < 60) { + return + } + await this.cacheClockSkew() - if (this.config.archiveSeconds >= 60) { - await this.watch() - this.cronMonitorInterval = setInterval(async () => await this.monitorCron(), this.cronMonitorIntervalMs) - } + await this.manager.subscribe(queues.CRON, { newJobCheckIntervalSeconds: 4 }, (job) => this.onCron(job)) + await this.manager.subscribe(queues.SEND_IT, { newJobCheckIntervalSeconds: 4, teamSize: 50, teamConcurrency: 5 }, (job) => this.onSendIt(job)) + + await this.cronMonitorAsync() + this.cronMonitorInterval = setInterval(async () => await this.monitorCron(), this.cronMonitorIntervalMs) this.skewMonitorInterval = setInterval(async () => await this.cacheClockSkew(), this.skewMonitorIntervalMs) this.stopped = false } async stop () { - if (!this.stopped) { - this.stopped = true + if (this.stopped) { + return + } - if (this.skewMonitorInterval) { - clearInterval(this.skewMonitorInterval) - this.skewMonitorInterval = null - } + this.stopped = true - if (this.cronMonitorInterval) { - clearInterval(this.cronMonitorInterval) - this.cronMonitorInterval = null - } + await this.manager.unsubscribe(queues.CRON) + await this.manager.unsubscribe(queues.SEND_IT) + + if (this.skewMonitorInterval) { + clearInterval(this.skewMonitorInterval) + this.skewMonitorInterval = null + } + + if (this.cronMonitorInterval) { + clearInterval(this.cronMonitorInterval) + this.cronMonitorInterval = null } } @@ -96,13 +108,6 @@ class Timekeeper extends EventEmitter { this.clockSkew = skew } - async watch () { - await this.manager.subscribe(queues.CRON, { newJobCheckIntervalSeconds: 4 }, (job) => this.onCron(job)) - await this.manager.subscribe(queues.SEND_IT, { newJobCheckIntervalSeconds: 4, teamSize: 50, teamConcurrency: 5 }, (job) => this.onSendIt(job)) - - await this.cronMonitorAsync() - } - async cronMonitorAsync () { const opts = { retryLimit: 2, diff --git a/test/delayTest.js b/test/delayTest.js index df1a0721..068d1f88 100644 --- a/test/delayTest.js +++ b/test/delayTest.js @@ -5,29 +5,21 @@ const delay = require('delay') describe('delayed jobs', function () { it('should wait until after an int (in seconds)', async function () { const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema - const delaySeconds = 2 - const queue = 'wait' + const startAfter = 2 - const data = { message: 'hold your horses', submitted: Date.now() } - const options = { startAfter: delaySeconds } - - await boss.publish(queue, data, options) + await boss.publish(queue, null, { startAfter }) - return new Promise((resolve, reject) => { - boss.subscribe(queue, async job => { - const start = new Date(job.data.submitted) - const end = new Date() + const job = await boss.fetch(queue) - const elapsedSeconds = Math.floor((end - start) / 1000) + assert.strictEqual(job, null) - await job.done() + await delay(startAfter * 1000) - assert(delaySeconds >= elapsedSeconds) + const job2 = await boss.fetch(queue) - resolve() - }) - }) + assert(job2) }) it('should wait until after a date time string', async function () { diff --git a/test/migrationTest.js b/test/migrationTest.js index 8c416bb3..d2c7d07b 100644 --- a/test/migrationTest.js +++ b/test/migrationTest.js @@ -136,7 +136,7 @@ describe('migration', function () { } catch (error) { assert(error.message.includes('wat')) } finally { - boss1.stop() + await boss1.stop({ graceful: false }) } const version1 = await contractor.version() @@ -154,6 +154,6 @@ describe('migration', function () { assert.strictEqual(version2, currentSchemaVersion) - await boss2.stop() + await boss2.stop({ graceful: false }) }) }) diff --git a/test/multiMasterTest.js b/test/multiMasterTest.js index 125dd9c8..0d3e42ca 100644 --- a/test/multiMasterTest.js +++ b/test/multiMasterTest.js @@ -22,7 +22,7 @@ describe('multi-master', function () { } catch (err) { assert(false) } finally { - await pMap(instances, i => i.stop()) + await pMap(instances, i => i.stop({ graceful: false })) } }) @@ -55,7 +55,7 @@ describe('multi-master', function () { } catch (err) { assert(false) } finally { - await pMap(instances, i => i.stop()) + await pMap(instances, i => i.stop({ graceful: false })) } }) @@ -86,7 +86,7 @@ describe('multi-master', function () { assert.strictEqual(beforeCount, jobCount) - await boss.stop() + await boss.stop({ graceful: false }) boss = new PgBoss(this.test.bossConfig) @@ -98,6 +98,6 @@ describe('multi-master', function () { assert.strictEqual(completedCount, 1) - await boss.stop() + await boss.stop({ graceful: false }) }) }) diff --git a/test/opsTest.js b/test/opsTest.js index 6c9c1e1b..8cc32b20 100644 --- a/test/opsTest.js +++ b/test/opsTest.js @@ -64,7 +64,7 @@ describe('ops', function () { }) it('should emit error during graceful stop if subscriptions busy', async function () { - const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, ...defaults, __test__throw_stop: true }) + const boss = await helper.start({ ...this.test.bossConfig, ...defaults, __test__throw_stop: true }) const queue = this.test.bossConfig.schema await boss.publish(queue) @@ -78,7 +78,7 @@ describe('ops', function () { }) it('should throw error during graceful stop if no subscriptions are busy', async function () { - const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, ...defaults, __test__throw_stop: true }) + const boss = await helper.start({ ...this.test.bossConfig, ...defaults, __test__throw_stop: true }) try { await boss.stop({ timeout: 1 }) diff --git a/test/scheduleTest.js b/test/scheduleTest.js index bb2c5498..f4a82274 100644 --- a/test/scheduleTest.js +++ b/test/scheduleTest.js @@ -72,7 +72,7 @@ describe('schedule', function () { await boss.schedule(queue, '* * * * *') - await boss.stop() + await boss.stop({ graceful: false }) boss = await helper.start({ ...this.test.bossConfig, noSupervisor: true }) @@ -82,19 +82,19 @@ describe('schedule', function () { assert(job) - await boss.stop() + await boss.stop({ graceful: false }) }) it('should remove previously scheduled job', async function () { const queue = 'schedule-remove' - const boss = await helper.start({ ...this.test.bossConfig, noSupervisor: true }) + const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, noSupervisor: true }) await boss.schedule(queue, '* * * * *') await boss.unschedule(queue) - await boss.stop() + await boss.stop({ graceful: false }) const db = await helper.getDb() await db.executeSql(plans.clearStorage(this.test.bossConfig.schema)) @@ -106,8 +106,6 @@ describe('schedule', function () { const job = await boss.fetch(queue) assert(job === null) - - await boss.stop() }) it('should publish job based on current minute in UTC', async function () { diff --git a/test/subscribeTest.js b/test/subscribeTest.js index 2dfc4039..c4f68783 100644 --- a/test/subscribeTest.js +++ b/test/subscribeTest.js @@ -50,10 +50,10 @@ describe('subscribe', function () { it('should honor a custom new job check interval', async function () { const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema - const queue = 'customJobCheckInterval' - const newJobCheckIntervalSeconds = 3 - const timeout = 9000 + const newJobCheckIntervalSeconds = 1 + const timeout = 5000 let subscribeCount = 0 const jobCount = 10 @@ -65,7 +65,7 @@ describe('subscribe', function () { await delay(timeout) - assert(subscribeCount <= timeout / 1000 / newJobCheckIntervalSeconds) + assert.strictEqual(subscribeCount, timeout / 1000 / newJobCheckIntervalSeconds) }) it('should unsubscribe a subscription', async function () {