Skip to content

Commit

Permalink
add debounce support on insert() via singletonSeconds (#485)
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit authored Aug 28, 2024
1 parent ead2bf0 commit b4965d3
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 6 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pg-boss",
"version": "10.0.6",
"version": "10.1.0",
"description": "Queueing jobs in Postgres from Node.js like a boss",
"main": "./src/index.js",
"engines": {
Expand Down
4 changes: 3 additions & 1 deletion src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ class Manager extends EventEmitter {
this.config.retryBackoff // 6
]

return await db.executeSql(this.insertJobsCommand, params)
const { rows } = await db.executeSql(this.insertJobsCommand, params)

return (rows.length) ? rows.map(i => i.id) : null
}

getDebounceStartAfter (singletonSeconds, clockOffset) {
Expand Down
7 changes: 6 additions & 1 deletion src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ function insertJobs (schema) {
priority,
start_after,
singleton_key,
singleton_on,
dead_letter,
expire_in,
keep_until,
Expand All @@ -743,6 +744,10 @@ function insertJobs (schema) {
COALESCE(priority, 0) as priority,
j.start_after,
"singletonKey" as singleton_key,
CASE
WHEN "singletonSeconds" IS NOT NULL THEN 'epoch'::timestamp + '1 second'::interval * ("singletonSeconds" * floor( date_part('epoch', now()) / "singletonSeconds" ))
ELSE NULL
END as singleton_on,
COALESCE("deadLetter", q.dead_letter) as dead_letter,
CASE
WHEN "expireInSeconds" IS NOT NULL THEN "expireInSeconds" * interval '1s'
Expand Down Expand Up @@ -778,7 +783,7 @@ function insertJobs (schema) {
"retryDelay" integer,
"retryBackoff" boolean,
"singletonKey" text,
"singletonOn" text,
"singletonSeconds" integer,
"expireInSeconds" integer,
"keepUntil" timestamp with time zone,
"deadLetter" text
Expand Down
2 changes: 1 addition & 1 deletion src/timekeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Timekeeper extends EventEmitter {
batchSize: 50
}

await this.manager.work(QUEUES.SEND_IT, options, (jobs) => this.manager.insert(jobs.map(i => i.data)))
await this.manager.work(QUEUES.SEND_IT, options, async (jobs) => { await this.manager.insert(jobs.map(i => i.data)) })

setImmediate(() => this.onCron())

Expand Down
1 change: 1 addition & 0 deletions types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ declare namespace PgBoss {
retryBackoff?: boolean;
startAfter?: Date | string;
singletonKey?: string;
singletonSeconds?: number;
expireInSeconds?: number;
keepUntil?: Date | string;
deadLetter?: string;
Expand Down

0 comments on commit b4965d3

Please sign in to comment.