Skip to content

Commit

Permalink
update types and fix byodb
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Jul 18, 2024
1 parent 99cf534 commit 132ebb3
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 134 deletions.
9 changes: 4 additions & 5 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,12 @@ The following options can be set as properties in an object for additional confi


```js
const text = "select 1 as value1 from table1 where bar = $1"
const values = ['foo']
const text = "select $1 as input"
const values = ['arg1']

const { rows, rowCount } = await executeSql(text, values)
const { rows } = await executeSql(text, values)

assert(rows[0].value1 === 1)
assert(rowCount === 1)
assert(rows[0].input === 'arg1')
```

* **schema** - string, defaults to "pgboss"
Expand Down
29 changes: 9 additions & 20 deletions src/boss.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ class Boss extends EventEmitter {
this.failJobsByTimeoutCommand = plans.locked(config.schema, plans.failJobsByTimeout(config.schema))
this.archiveCommand = plans.locked(config.schema, plans.archive(config.schema, config.archiveInterval, config.archiveFailedInterval))
this.dropCommand = plans.locked(config.schema, plans.drop(config.schema, config.deleteAfter))
this.getMaintenanceTimeCommand = plans.getMaintenanceTime(config.schema)
this.setMaintenanceTimeCommand = plans.setMaintenanceTime(config.schema)
this.getMonitorTimeCommand = plans.getMonitorTime(config.schema)
this.setMonitorTimeCommand = plans.setMonitorTime(config.schema)
this.trySetMaintenanceTimeCommand = plans.trySetMaintenanceTime(config.schema)
this.trySetMonitorTimeCommand = plans.trySetMonitorTime(config.schema)
this.countStatesCommand = plans.countStates(config.schema)

this.functions = [
Expand All @@ -48,8 +46,6 @@ class Boss extends EventEmitter {
}

async onMonitor () {
let locker

try {
if (this.monitoring) {
return
Expand All @@ -65,26 +61,24 @@ class Boss extends EventEmitter {
throw new Error(this.config.__test__throw_monitor)
}

locker = await this.db.lock({ key: 'monitor' })
if (this.stopped) {
return
}

const { secondsAgo } = await this.getMonitorTime()
const { rows } = await this.db.executeSql(this.trySetMonitorTimeCommand, [this.config.monitorStateIntervalSeconds])

if (secondsAgo > this.monitorStateIntervalSeconds && !this.stopped) {
if (rows.length === 1 && !this.stopped) {
const states = await this.countStates()
this.setMonitorTime()
this.emit(events.monitorStates, states)
}
} catch (err) {
this.emit(events.error, err)
} finally {
await locker?.unlock()
this.monitoring = false
}
}

async onSupervise () {
let locker

try {
if (this.maintaining) {
return
Expand All @@ -105,18 +99,15 @@ class Boss extends EventEmitter {
return
}

locker = await this.db.lock({ key: 'maintenance' })

const { secondsAgo } = await this.getMaintenanceTime()
const { rows } = await this.db.executeSql(this.trySetMaintenanceTimeCommand, [this.config.maintenanceIntervalSeconds])

if (secondsAgo > this.maintenanceIntervalSeconds) {
if (rows.length === 1 && !this.stopped) {
const result = await this.maintain()
this.emit(events.maintenance, result)
}
} catch (err) {
this.emit(events.error, err)
} finally {
await locker?.unlock()
this.maintaining = false
}
}
Expand All @@ -130,8 +121,6 @@ class Boss extends EventEmitter {

const ended = Date.now()

await this.setMaintenanceTime()

return { ms: ended - started }
}

Expand Down
26 changes: 0 additions & 26 deletions src/db.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const EventEmitter = require('events')
const pg = require('pg')
const { advisoryLock } = require('./plans')

class Db extends EventEmitter {
constructor (config) {
Expand Down Expand Up @@ -43,31 +42,6 @@ class Db extends EventEmitter {
return await this.pool.query(text, values)
}
}

async lock ({ timeout = 30, key } = {}) {
const lockedClient = await this.pool.connect()

const query = `
BEGIN;
SET LOCAL lock_timeout = '${timeout}s';
SET LOCAL idle_in_transaction_session_timeout = '${timeout}s';
${advisoryLock(this.config.schema, key)};
`

await lockedClient.query(query)

const locker = {
unlock: async function () {
try {
await lockedClient.query('COMMIT')
} finally {
lockedClient.release()
}
}
}

return locker
}
}

module.exports = Db
21 changes: 8 additions & 13 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,9 @@ class Manager extends EventEmitter {
async publish (event, ...args) {
assert(event, 'Missing required argument')

const result = await this.db.executeSql(this.getQueuesForEventCommand, [event])
const { rows } = await this.db.executeSql(this.getQueuesForEventCommand, [event])

if (!result || result.rowCount === 0) {
return []
}

return await Promise.all(result.rows.map(({ name }) => this.send(name, ...args)))
return await Promise.all(rows.map(({ name }) => this.send(name, ...args)))
}

async send (...args) {
Expand Down Expand Up @@ -406,10 +402,10 @@ class Manager extends EventEmitter {
]

const db = wrapper || this.db
const result = await db.executeSql(this.insertJobCommand, values)
const { rows } = await db.executeSql(this.insertJobCommand, values)

if (result && result.rowCount === 1) {
return result.rows[0].id
if (rows.length === 1) {
return rows[0].id
}

if (!options.singletonNextSlot) {
Expand Down Expand Up @@ -645,17 +641,16 @@ class Manager extends EventEmitter {
assert(name, 'Missing queue name argument')

const queueSql = plans.getQueueByName(this.config.schema)
const result = await this.db.executeSql(queueSql, [name])
const { rows } = await this.db.executeSql(queueSql, [name])

if (result?.rows?.length) {
if (rows.length) {
Attorney.assertQueueName(name)
const sql = plans.dropPartition(this.config.schema, name)
await this.db.executeSql(sql)
}

const sql = plans.deleteQueueRecords(this.config.schema)
const result2 = await this.db.executeSql(sql, [name])
return result2?.rowCount || null
await this.db.executeSql(sql, [name])
}

async purgeQueue (queue) {
Expand Down
40 changes: 16 additions & 24 deletions src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,10 @@ module.exports = {
getQueueSize,
purgeQueue,
clearStorage,
getMaintenanceTime,
setMaintenanceTime,
getMonitorTime,
setMonitorTime,
getCronTime,
setCronTime,
trySetMaintenanceTime,
trySetMonitorTime,
trySetCronTime,
locked,
advisoryLock,
assertMigration,
getArchivedJobById,
getJobById,
Expand Down Expand Up @@ -271,29 +267,24 @@ function createIndexArchiveArchivedOn (schema) {
return `CREATE INDEX archive_archived_on_idx ON ${schema}.archive(archived_on)`
}

function getMaintenanceTime (schema) {
return `SELECT maintained_on, EXTRACT( EPOCH FROM (now() - maintained_on) ) seconds_ago FROM ${schema}.version`
function trySetMaintenanceTime (schema) {
return trySetTimestamp(schema, 'maintained_on')
}

function setMaintenanceTime (schema) {
return `UPDATE ${schema}.version SET maintained_on = now()`
function trySetMonitorTime (schema) {
return trySetTimestamp(schema, 'monitored_on')
}

function getMonitorTime (schema) {
return `SELECT monitored_on, EXTRACT( EPOCH FROM (now() - monitored_on) ) seconds_ago FROM ${schema}.version`
function trySetCronTime (schema) {
return trySetTimestamp(schema, 'cron_on')
}

function setMonitorTime (schema) {
return `UPDATE ${schema}.version SET monitored_on = now()`
}

function setCronTime (schema, time) {
time = time || 'now()'
return `UPDATE ${schema}.version SET cron_on = ${time}`
}

function getCronTime (schema) {
return `SELECT cron_on, EXTRACT( EPOCH FROM (now() - cron_on) ) seconds_ago FROM ${schema}.version`
function trySetTimestamp (schema, column) {
return `
UPDATE ${schema}.version SET ${column} = now()
WHERE EXTRACT( EPOCH FROM (now() - COALESCE(${column}, now() - interval '1 week') ) ) > $1
RETURNING true
`
}

function createQueue (schema) {
Expand Down Expand Up @@ -767,6 +758,7 @@ function locked (schema, query) {
return `
BEGIN;
SET LOCAL lock_timeout = '30s';
SET LOCAL idle_in_transaction_session_timeout = '30s';
${advisoryLock(schema)};
${query};
COMMIT;
Expand Down
49 changes: 14 additions & 35 deletions src/timekeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class Timekeeper extends EventEmitter {
this.getSchedulesCommand = plans.getSchedules(config.schema)
this.scheduleCommand = plans.schedule(config.schema)
this.unscheduleCommand = plans.unschedule(config.schema)
this.getCronTimeCommand = plans.getCronTime(config.schema)
this.setCronTimeCommand = plans.setCronTime(config.schema)
this.trySetCronTimeCommand = plans.trySetCronTime(config.schema)

this.functions = [
this.schedule,
Expand All @@ -44,27 +43,30 @@ class Timekeeper extends EventEmitter {
}

async start () {
this.stopped = false

// setting the archive config too low breaks the cron 60s debounce interval so don't even try
if (this.config.archiveSeconds < 60 || this.config.archiveFailedSeconds < 60) {
return
}

// cache the clock skew from the db server
this.stopped = false

await this.cacheClockSkew()

try {
await this.manager.createQueue(queues.SEND_IT)
} catch {}

await this.manager.work(queues.SEND_IT, { newJobCheckIntervalSeconds: this.config.cronWorkerIntervalSeconds, teamSize: 50, teamConcurrency: 5 }, (job) => this.onSendIt(job))
const options = {
newJobCheckIntervalSeconds: this.config.cronWorkerIntervalSeconds,
teamSize: 50,
teamConcurrency: 5
}

await this.manager.work(queues.SEND_IT, options, (job) => this.onSendIt(job))

setImmediate(() => this.onCron())

// create monitoring interval to make sure cron hasn't crashed
this.cronMonitorInterval = setInterval(async () => await this.onCron(), this.cronMonitorIntervalMs)
// create monitoring interval to measure and adjust for drift in clock skew
this.skewMonitorInterval = setInterval(async () => await this.cacheClockSkew(), this.skewMonitorIntervalMs)
}

Expand Down Expand Up @@ -117,8 +119,6 @@ class Timekeeper extends EventEmitter {
}

async onCron () {
let locker

try {
if (this.stopped || this.timekeeping) return

Expand All @@ -128,19 +128,15 @@ class Timekeeper extends EventEmitter {

this.timekeeping = true

locker = await this.db.lock({ key: 'timekeeper' })

const { secondsAgo } = await this.getCronTime()
const { rows } = await this.db.executeSql(this.trySetCronTimeCommand, [this.config.cronMonitorIntervalSeconds])

if (secondsAgo > this.config.cronMonitorIntervalSeconds) {
if (rows.length === 1 && !this.stopped) {
await this.cron()
await this.setCronTime()
}
} catch (err) {
this.emit(this.events.error, err)
} finally {
this.timekeeping = false
await locker?.unlock()
}
}

Expand Down Expand Up @@ -198,28 +194,11 @@ class Timekeeper extends EventEmitter {

const values = [name, cron, tz, data, options]

const result = await this.db.executeSql(this.scheduleCommand, values)

return result ? result.rowCount : null
await this.db.executeSql(this.scheduleCommand, values)
}

async unschedule (name) {
const result = await this.db.executeSql(this.unscheduleCommand, [name])
return result ? result.rowCount : null
}

async setCronTime () {
await this.db.executeSql(this.setCronTimeCommand)
}

async getCronTime () {
const { rows } = await this.db.executeSql(this.getCronTimeCommand)

let { cron_on: cronOn, seconds_ago: secondsAgo } = rows[0]

secondsAgo = secondsAgo !== null ? parseFloat(secondsAgo) : 61

return { cronOn, secondsAgo }
await this.db.executeSql(this.unscheduleCommand, [name])
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/databaseTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ describe('database', function () {
const query = 'SELECT something FROM somewhere'

const mydb = {
executeSql: async (text, values) => ({ rows: [], text, rowCount: 0 })
executeSql: async (text, values) => ({ rows: [], text })
}

const boss = new PgBoss({ db: mydb })
Expand Down
Loading

0 comments on commit 132ebb3

Please sign in to comment.