From fa500440b699ba3891ea08b7e5c6bb8f687c1c62 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 05:41:30 +0100 Subject: [PATCH 01/20] Hoist getJob to workerPool --- src/interfaces.ts | 4 ++++ src/main.ts | 12 ++++++++++++ src/worker.ts | 12 ++++-------- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/interfaces.ts b/src/interfaces.ts index 134ce8c8..3bce2307 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -1157,3 +1157,7 @@ export interface WorkerPluginContext { hooks: AsyncHooks; resolvedPreset: ResolvedWorkerPreset; } +export type GetJobFunction = ( + workerId: string, + flagsToSkip: string[] | null, +) => Promise; diff --git a/src/main.ts b/src/main.ts index 32bbb7c7..d279a682 100644 --- a/src/main.ts +++ b/src/main.ts @@ -10,6 +10,7 @@ import { } from "./helpers"; import { EnhancedWithPgClient, + GetJobFunction, Job, RunOnceOptions, TaskList, @@ -27,6 +28,7 @@ import { import { Logger } from "./logger"; import SIGNALS, { Signal } from "./signals"; import { failJobs } from "./sql/failJob"; +import { getJob as baseGetJob } from "./sql/getJob"; import { resetLockedAt } from "./sql/resetLockedAt"; import { makeNewWorker } from "./worker"; @@ -831,6 +833,15 @@ export function _runTaskList( `You must not set workerId when concurrency > 1; each worker must have a unique identifier`, ); } + const getJob: GetJobFunction = async (workerId, flagsToSkip) => { + return baseGetJob( + compiledSharedOptions, + withPgClient, + tasks, + workerId, + flagsToSkip, + ); + }; for (let i = 0; i < concurrency; i++) { const worker = makeNewWorker(compiledSharedOptions, { tasks, @@ -840,6 +851,7 @@ export function _runTaskList( workerPool, autostart, workerId, + getJob, }); workerPool._workers.push(worker); const remove = () => { diff --git a/src/worker.ts b/src/worker.ts index 270a127b..9bc6281f 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -5,6 +5,7 @@ import deferred from "./deferred"; import { makeJobHelpers } from "./helpers"; import { EnhancedWithPgClient, + GetJobFunction, Job, PromiseOrDirect, TaskList, @@ -15,7 +16,6 @@ import { import { CompiledSharedOptions } from "./lib"; import { completeJob } from "./sql/completeJob"; import { failJob } from "./sql/failJob"; -import { getJob } from "./sql/getJob"; export function makeNewWorker( compiledSharedOptions: CompiledSharedOptions, @@ -27,6 +27,7 @@ export function makeNewWorker( workerPool: WorkerPool; autostart?: boolean; workerId?: string; + getJob: GetJobFunction; }, ): Worker { const { @@ -37,6 +38,7 @@ export function makeNewWorker( workerPool, autostart = true, workerId = `worker-${randomBytes(9).toString("hex")}`, + getJob, } = params; const { events, @@ -167,13 +169,7 @@ export function makeNewWorker( } events.emit("worker:getJob:start", { worker }); - const jobRow = await getJob( - compiledSharedOptions, - withPgClient, - tasks, - workerId, - flagsToSkip, - ); + const jobRow = await getJob(workerId, flagsToSkip); // `doNext` cannot be executed concurrently, so we know this is safe. // eslint-disable-next-line require-atomic-updates From 57eb29d59f425cfbe1ae1de5d942cd5f570a6f45 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 06:56:15 +0100 Subject: [PATCH 02/20] Switch to using poolId rather than workerId for locking/unlocking jobs --- src/sql/completeJob.ts | 4 ++-- src/sql/failJob.ts | 6 +++--- src/sql/getJob.ts | 4 ++-- src/worker.ts | 11 ++++++++--- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/sql/completeJob.ts b/src/sql/completeJob.ts index 95b87311..404235b6 100644 --- a/src/sql/completeJob.ts +++ b/src/sql/completeJob.ts @@ -4,7 +4,7 @@ import { CompiledSharedOptions } from "../lib"; export async function completeJob( compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, - workerId: string, + poolId: string, job: DbJob, ): Promise { const { @@ -29,7 +29,7 @@ update ${escapedWorkerSchema}._private_job_queues as job_queues set locked_by = null, locked_at = null from j where job_queues.id = j.job_queue_id and job_queues.locked_by = $2::text;`, - values: [job.id, workerId], + values: [job.id, poolId], name: !preparedStatements ? undefined : `complete_job_q/${workerSchema}`, diff --git a/src/sql/failJob.ts b/src/sql/failJob.ts index e76d62f6..8706c2f2 100644 --- a/src/sql/failJob.ts +++ b/src/sql/failJob.ts @@ -4,7 +4,7 @@ import { CompiledSharedOptions } from "../lib"; export async function failJob( compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, - workerId: string, + poolId: string, job: DbJob, message: string, replacementPayload: undefined | unknown[], @@ -40,7 +40,7 @@ where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text;`, values: [ job.id, message, - workerId, + poolId, replacementPayload != null ? JSON.stringify(replacementPayload) : null, @@ -63,7 +63,7 @@ where id = $1::bigint and locked_by = $3::text;`, values: [ job.id, message, - workerId, + poolId, replacementPayload != null ? JSON.stringify(replacementPayload) : null, diff --git a/src/sql/getJob.ts b/src/sql/getJob.ts index a96dfedd..7f1f2247 100644 --- a/src/sql/getJob.ts +++ b/src/sql/getJob.ts @@ -15,7 +15,7 @@ export async function getJob( compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, tasks: TaskList, - workerId: string, + poolId: string, flagsToSkip: string[] | null, ): Promise { const { @@ -172,7 +172,7 @@ with j as ( // TODO: breaking change; change this to more optimal: // `RETURNING id, job_queue_id, task_id, payload`, const values = [ - workerId, + poolId, taskDetails.taskIds, ...(hasFlags ? [flagsToSkip!] : []), ...(useNodeTime ? [new Date().toISOString()] : []), diff --git a/src/worker.ts b/src/worker.ts index 9bc6281f..7377d4e4 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -169,7 +169,7 @@ export function makeNewWorker( } events.emit("worker:getJob:start", { worker }); - const jobRow = await getJob(workerId, flagsToSkip); + const jobRow = await getJob(workerPool.id, flagsToSkip); // `doNext` cannot be executed concurrently, so we know this is safe. // eslint-disable-next-line require-atomic-updates @@ -339,7 +339,7 @@ export function makeNewWorker( await failJob( compiledSharedOptions, withPgClient, - workerId, + workerPool.id, job, message, // "Batch jobs": copy through only the unsuccessful parts of the payload @@ -368,7 +368,12 @@ export function makeNewWorker( ); } - await completeJob(compiledSharedOptions, withPgClient, workerId, job); + await completeJob( + compiledSharedOptions, + withPgClient, + workerPool.id, + job, + ); } events.emit("job:complete", { worker, job, error: err }); } catch (fatalError) { From a771d6365812f688e6b9def9d2f30c256a306904 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 06:59:21 +0100 Subject: [PATCH 03/20] Fix test --- __tests__/main.runTaskListOnce.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/__tests__/main.runTaskListOnce.test.ts b/__tests__/main.runTaskListOnce.test.ts index 1357b0f1..5a286f72 100644 --- a/__tests__/main.runTaskListOnce.test.ts +++ b/__tests__/main.runTaskListOnce.test.ts @@ -663,7 +663,7 @@ test("runs jobs asynchronously", () => expect(q.job_count).toEqual(1); expect(+q.locked_at).toBeGreaterThanOrEqual(+start); expect(+q.locked_at).toBeLessThanOrEqual(+new Date()); - expect(q.locked_by).toEqual(worker.workerId); + expect(q.locked_by).toEqual(worker.workerPool.id); } jobPromise!.resolve(); From 58776778127bdc11da1f1474bc84b281876c9cd8 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 07:04:18 +0100 Subject: [PATCH 04/20] Add release notes --- RELEASE_NOTES.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 55fc7a7a..34646da3 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -19,6 +19,13 @@ to make sure the system as a whole remains consistent. Read more: [Worker Pro Migration](https://worker.graphile.org/docs/pro/migration). +## Pending + +- BREAKING: Jobs and queues are now `locked_by` their `WorkerPool`'s id rather + than the `workerId`. Be sure to upgrade + [Worker Pro](https://worker.graphile.org/docs/pro) at the same time if you're + using it! + ## v0.16.6 - Fix bug in `workerUtils.cleanup()` where queues would not be cleaned up if From 6d7a9a2aa266c02be8416616039e2da1ff31e7a9 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 07:21:38 +0100 Subject: [PATCH 05/20] Update docs and tests to refer to pool rather than worker for locked_at --- __tests__/resetLockedAt.test.ts | 2 +- __tests__/workerUtils.cleanup.test.ts | 20 ++++++------- .../workerUtils.forceUnlockWorkers.test.ts | 28 +++++++++---------- website/docs/jobs-view.md | 2 +- website/docs/pro/migration.md | 19 ++++++++++++- 5 files changed, 44 insertions(+), 27 deletions(-) diff --git a/__tests__/resetLockedAt.test.ts b/__tests__/resetLockedAt.test.ts index bafea79b..2e9d3825 100644 --- a/__tests__/resetLockedAt.test.ts +++ b/__tests__/resetLockedAt.test.ts @@ -32,7 +32,7 @@ test("main will execute jobs as they come up, and exits cleanly", () => `\ update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs set - locked_by = 'some_worker_id', + locked_by = 'some_pool_id', locked_at = now() - ( case payload->>'id' when 'locked_recently' then interval '5 minutes' diff --git a/__tests__/workerUtils.cleanup.test.ts b/__tests__/workerUtils.cleanup.test.ts index 03744bff..762f1690 100644 --- a/__tests__/workerUtils.cleanup.test.ts +++ b/__tests__/workerUtils.cleanup.test.ts @@ -71,18 +71,18 @@ test("cleanup with GC_JOB_QUEUES", () => }); const jobs: Job[] = []; - const WORKER_ID_1 = "worker1"; - const WORKER_ID_2 = "worker2"; - const WORKER_ID_3 = "worker3"; + const POOL_ID_1 = "pool-1"; + const POOL_ID_2 = "pool-2"; + const POOL_ID_3 = "pool-3"; let a = 0; const date = new Date(); const specs = [ - [WORKER_ID_1, "test", "test_job1"], - [WORKER_ID_2, "test2", "test_job2"], - [WORKER_ID_3, "test3", "test_job3"], + [POOL_ID_1, "test", "test_job1"], + [POOL_ID_2, "test2", "test_job2"], + [POOL_ID_3, "test3", "test_job3"], [null, null, "test_job4"], ] as const; - for (const [workerId, queueName, taskIdentifier] of specs) { + for (const [poolId, queueName, taskIdentifier] of specs) { date.setMinutes(date.getMinutes() - 1); const job = await utils.addJob( taskIdentifier, @@ -90,7 +90,7 @@ test("cleanup with GC_JOB_QUEUES", () => { queueName: queueName ?? undefined }, ); jobs.push(job); - if (workerId) { + if (poolId) { await pgClient.query( `\ with j as ( @@ -107,7 +107,7 @@ with j as ( where job_queues.id = j.job_queue_id ) select * from j`, - [date.toISOString(), workerId, job.id], + [date.toISOString(), poolId, job.id], ); } } @@ -121,7 +121,7 @@ select * from j`, "test3", ]); - await utils.forceUnlockWorkers(["worker3"]); + await utils.forceUnlockWorkers([POOL_ID_3]); const thirdJob = jobs[2]; // Belongs to queueName 'task3' await utils.completeJobs([thirdJob.id]); await utils.cleanup({ tasks: ["GC_JOB_QUEUES"] }); diff --git a/__tests__/workerUtils.forceUnlockWorkers.test.ts b/__tests__/workerUtils.forceUnlockWorkers.test.ts index 792e77a0..cf512556 100644 --- a/__tests__/workerUtils.forceUnlockWorkers.test.ts +++ b/__tests__/workerUtils.forceUnlockWorkers.test.ts @@ -29,24 +29,24 @@ test("unlocks jobs for the given workers, leaves others unaffected", () => }); const jobs: Job[] = []; - const WORKER_ID_1 = "worker1"; - const WORKER_ID_2 = "worker2"; - const WORKER_ID_3 = "worker3"; + const POOL_ID_1 = "pool-1"; + const POOL_ID_2 = "pool-2"; + const POOL_ID_3 = "pool-3"; let a = 0; const date = new Date(); const specs = [ - [WORKER_ID_1, null], - [WORKER_ID_1, "test"], - [WORKER_ID_2, null], - [WORKER_ID_2, "test2"], - [WORKER_ID_2, "test3"], - [WORKER_ID_3, null], + [POOL_ID_1, null], + [POOL_ID_1, "test"], + [POOL_ID_2, null], + [POOL_ID_2, "test2"], + [POOL_ID_2, "test3"], + [POOL_ID_3, null], [null, null], [null, "test"], [null, "test2"], [null, "test3"], ] as const; - for (const [workerId, queueName] of specs) { + for (const [poolId, queueName] of specs) { date.setMinutes(date.getMinutes() - 1); const job = await utils.addJob( "job3", @@ -58,7 +58,7 @@ test("unlocks jobs for the given workers, leaves others unaffected", () => update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs set locked_at = $1, locked_by = $2 where id = $3`, - [workerId ? date.toISOString() : null, workerId, job.id], + [poolId ? date.toISOString() : null, poolId, job.id], ); jobs.push(job); } @@ -69,7 +69,7 @@ set locked_at = jobs.locked_at, locked_by = jobs.locked_by from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs where jobs.job_queue_id = job_queues.id;`, ); - await utils.forceUnlockWorkers([WORKER_ID_2, WORKER_ID_3]); + await utils.forceUnlockWorkers([POOL_ID_2, POOL_ID_3]); const remaining = await getJobs(pgClient); remaining.sort((a, z) => Number(a.id) - Number(z.id)); @@ -79,7 +79,7 @@ where jobs.job_queue_id = job_queues.id;`, const job = jobs[i]; const updatedJob = remaining[i]; expect(updatedJob.id).toEqual(job.id); - if (spec[0] === WORKER_ID_2 || spec[0] === WORKER_ID_3) { + if (spec[0] === POOL_ID_2 || spec[0] === POOL_ID_3) { expect(updatedJob.locked_by).toBeNull(); expect(updatedJob.locked_at).toBeNull(); } else if (spec[0]) { @@ -97,7 +97,7 @@ where jobs.job_queue_id = job_queues.id;`, expect(lockedQueues).toEqual([ expect.objectContaining({ queue_name: "test", - locked_by: WORKER_ID_1, + locked_by: POOL_ID_1, }), ]); })); diff --git a/website/docs/jobs-view.md b/website/docs/jobs-view.md index dbf2899e..fed590c7 100644 --- a/website/docs/jobs-view.md +++ b/website/docs/jobs-view.md @@ -49,7 +49,7 @@ performance issues! - `updated_at` - when the job was last updated - `key` - the `job_key` of the job, if any - `locked_at` - when the job was locked, if locked -- `locked_by` - the worker id that the job was locked by, if locked +- `locked_by` - the WorkerPool id that the job was locked by, if locked - `revision` - the revision number of the job, bumped each time the record is updated - `flags` - the [forbidden flags](/docs/forbidden-flags) associated with this diff --git a/website/docs/pro/migration.md b/website/docs/pro/migration.md index bd396690..3ca8d6c3 100644 --- a/website/docs/pro/migration.md +++ b/website/docs/pro/migration.md @@ -57,7 +57,7 @@ where locked_by is not null and locked_by not in ( commit; ``` -For Graphile Worker v0.16.0+ it would be: +For Graphile Worker v0.16.x it would be: ```sql begin; @@ -73,3 +73,20 @@ where locked_by is not null and locked_by not in ( ); commit; ``` + +For Graphile Worker v0.17.x+ it would be: + +```sql +begin; +update graphile_worker._private_jobs as jobs +set locked_at = null, locked_by = null +where locked_by is not null and locked_by not in ( + select pool_id from graphile_worker._private_pro_pools +); +update graphile_worker._private_job_queues as job_queues +set locked_at = null, locked_by = null +where locked_by is not null and locked_by not in ( + select pool_id from graphile_worker._private_pro_pools +); +commit; +``` From f481a42cf78c56c4bb6fe4e7c9bab74f61928cba Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 08:14:32 +0100 Subject: [PATCH 06/20] Switch failJobs to using poolId too --- src/main.ts | 4 ++-- src/sql/failJob.ts | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main.ts b/src/main.ts index d279a682..a1b8e343 100644 --- a/src/main.ts +++ b/src/main.ts @@ -687,7 +687,7 @@ export function _runTaskList( const cancelledJobs = await failJobs( compiledSharedOptions, withPgClient, - workerIds, + workerPool.id, jobsToRelease, message, ); @@ -761,7 +761,7 @@ export function _runTaskList( const cancelledJobs = await failJobs( compiledSharedOptions, withPgClient, - workerIds, + workerPool.id, jobsInProgress, message, ); diff --git a/src/sql/failJob.ts b/src/sql/failJob.ts index 8706c2f2..63a08884 100644 --- a/src/sql/failJob.ts +++ b/src/sql/failJob.ts @@ -77,7 +77,7 @@ where id = $1::bigint and locked_by = $3::text;`, export async function failJobs( compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, - workerIds: string[], + poolId: string, jobs: DbJob[], message: string, ): Promise { @@ -100,16 +100,16 @@ last_error = $2::text, run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'), locked_by = null, locked_at = null -where id = any($1::int[]) and locked_by = any($3::text[]) +where id = any($1::int[]) and locked_by = $3::text returning * ), queues as ( update ${escapedWorkerSchema}._private_job_queues as job_queues set locked_by = null, locked_at = null from j -where job_queues.id = j.job_queue_id and job_queues.locked_by = any($3::text[]) +where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text ) select * from j;`, - values: [jobs.map((job) => job.id), message, workerIds], + values: [jobs.map((job) => job.id), message, poolId], name: !preparedStatements ? undefined : `fail_jobs/${workerSchema}`, }), ); From 88de944758fde127a3c12be7b80231775cf83b12 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 06:16:13 +0100 Subject: [PATCH 07/20] Start work on batch fetching jobs --- src/index.ts | 43 ++++++++++++++++++++++++++++++++ src/main.ts | 62 +++++++++++++++++++++++++++++++++++++++++------ src/sql/getJob.ts | 26 ++++++++++---------- 3 files changed, 110 insertions(+), 21 deletions(-) diff --git a/src/index.ts b/src/index.ts index 1b280120..1072bbd2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -154,6 +154,49 @@ declare global { logger?: Logger; events?: WorkerEvents; + + /** + * To enable processing jobs in batches, set this to an integer larger + * than 1. This will result in jobs being fetched by the pool rather than + * the worker, the pool will fetch (and lock!) `getJobBatchSize` jobs up + * front, and each time a worker requests a job it will be served from + * this list until the list is exhausted, at which point a new set of + * jobs will be fetched (and locked). + * + * This setting can help reduce the load on your database from looking + * for jobs, but is only really effective when there are often many jobs + * queued and ready to go, and can increase the latency of job execution + * because a single worker may lock jobs into its queue leaving other + * workers idle. + * + * @default `1` + */ + getJobBatchSize?: number; + + /** + * The time in milliseconds to wait after a `completeJob` call to see if + * there are any other completeJob calls that can be batched together. A + * setting of `-1` disables this. + * + * Enabling this feature increases the time for which jobs are locked + * past completion, thus increasing the risk of catastrophic failure + * resulting in the jobs being executed again once they expire. + * + * @default `-1` + */ + completeJobBatchDelay?: number; + + /** + * The time in milliseconds to wait after a `failJob` call to see if + * there are any other failJob calls that can be batched together. A + * setting of `-1` disables this. + * + * Enabling this feature increases the time for which jobs are locked + * past failure. + * + * @default `-1` + */ + failJobBatchDelay?: number; } interface Preset { worker?: WorkerOptions; diff --git a/src/main.ts b/src/main.ts index a1b8e343..40a4a57f 100644 --- a/src/main.ts +++ b/src/main.ts @@ -530,7 +530,13 @@ export function _runTaskList( ): WorkerPool { const { resolvedPreset: { - worker: { concurrentJobs: baseConcurrency, gracefulShutdownAbortTimeout }, + worker: { + concurrentJobs: baseConcurrency, + gracefulShutdownAbortTimeout, + getJobBatchSize = 1, + completeJobBatchDelay = -1, + failJobBatchDelay = -1, + }, }, _rawOptions: { noHandleSignals = false }, } = compiledSharedOptions; @@ -833,14 +839,54 @@ export function _runTaskList( `You must not set workerId when concurrency > 1; each worker must have a unique identifier`, ); } + let jobQueue: Job[] = []; + let nextJobs: Promise | null = null; const getJob: GetJobFunction = async (workerId, flagsToSkip) => { - return baseGetJob( - compiledSharedOptions, - withPgClient, - tasks, - workerId, - flagsToSkip, - ); + if (flagsToSkip !== null || getJobBatchSize <= 1) { + const jobs = await baseGetJob( + compiledSharedOptions, + withPgClient, + tasks, + workerId, + flagsToSkip, + 1, + ); + return jobs[0]; + } else { + const job = jobQueue.pop(); + if (job) { + // Queue already has a job, run that + return job; + } else { + if (!nextJobs) { + // Queue is empty, no fetch of jobs in progress; let's fetch them + nextJobs = (async () => { + const jobs = await baseGetJob( + compiledSharedOptions, + withPgClient, + tasks, + workerId, + flagsToSkip, + getJobBatchSize, + ); + jobQueue = jobs.reverse(); + // Return true if we fetched the full batch size + const fetchAgain = jobs.length >= getJobBatchSize; + return fetchAgain; + })(); + } + /** If true, the full batch size was fetched, so if the queue is exhausted again it's likely that there will be more jobs */ + const fetchAgain = await nextJobs; + const job = jobQueue.pop(); + if (job) { + return job; + } else if (fetchAgain) { + return getJob(workerId, flagsToSkip); + } else { + return undefined; + } + } + } }; for (let i = 0; i < concurrency; i++) { const worker = makeNewWorker(compiledSharedOptions, { diff --git a/src/sql/getJob.ts b/src/sql/getJob.ts index 7f1f2247..eb3bb15b 100644 --- a/src/sql/getJob.ts +++ b/src/sql/getJob.ts @@ -17,7 +17,9 @@ export async function getJob( tasks: TaskList, poolId: string, flagsToSkip: string[] | null, -): Promise { + rawBatchSize: number, +): Promise { + const batchSize = parseInt(String(rawBatchSize), 10) || 1; const { escapedWorkerSchema, workerSchema, @@ -38,7 +40,7 @@ export async function getJob( if (taskDetails.taskIds.length === 0) { logger.error("No tasks found; nothing to do!"); - return undefined; + return []; } let i = 2; @@ -157,7 +159,7 @@ with j as ( ${queueClause} ${flagsClause} order by priority asc, run_at asc - limit 1 + limit ${batchSize} for update skip locked )${updateQueue} @@ -179,23 +181,21 @@ with j as ( ]; const name = !preparedStatements ? undefined - : `get_job${hasFlags ? "F" : ""}${useNodeTime ? "N" : ""}/${workerSchema}`; + : `get_job${batchSize === 1 ? "" : batchSize}${hasFlags ? "F" : ""}${ + useNodeTime ? "N" : "" + }/${workerSchema}`; - const { - rows: [jobRow], - } = await withPgClient.withRetries((client) => + const { rows } = await withPgClient.withRetries((client) => client.query({ text, values, name, }), ); - if (jobRow) { - return Object.assign(jobRow, { + return rows.reverse().map((jobRow) => + Object.assign(jobRow, { task_identifier: taskDetails.supportedTaskIdentifierByTaskId[jobRow.task_id], - }); - } else { - return undefined; - } + }), + ); } From 454aed5cfc2e5a8026a58a79998b68ca6dc8bd5d Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 06:46:20 +0100 Subject: [PATCH 08/20] Batched job fetching with watermark --- src/index.ts | 2 +- src/main.ts | 69 ++++++++++++++++++++++++++--------------------- src/sql/getJob.ts | 2 +- 3 files changed, 40 insertions(+), 33 deletions(-) diff --git a/src/index.ts b/src/index.ts index 1072bbd2..b0a6f81e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -169,7 +169,7 @@ declare global { * because a single worker may lock jobs into its queue leaving other * workers idle. * - * @default `1` + * @default `-1` */ getJobBatchSize?: number; diff --git a/src/main.ts b/src/main.ts index 40a4a57f..efd052b6 100644 --- a/src/main.ts +++ b/src/main.ts @@ -533,7 +533,7 @@ export function _runTaskList( worker: { concurrentJobs: baseConcurrency, gracefulShutdownAbortTimeout, - getJobBatchSize = 1, + getJobBatchSize = -1, completeJobBatchDelay = -1, failJobBatchDelay = -1, }, @@ -841,8 +841,42 @@ export function _runTaskList( } let jobQueue: Job[] = []; let nextJobs: Promise | null = null; + let getJobCounter = 0; + let getJobBaseline = 0; + const batchGetJob = async (myFetchId: number): Promise => { + if (!nextJobs) { + // Queue is empty, no fetch of jobs in progress; let's fetch them. + getJobBaseline = getJobCounter; + nextJobs = (async () => { + const jobs = await baseGetJob( + compiledSharedOptions, + withPgClient, + tasks, + workerPool.id, // << NOTE: This is the worker pool id, not the worker id! + null, + getJobBatchSize, + ); + jobQueue = jobs.reverse(); + return jobs.length >= getJobBatchSize; + })().finally(() => { + nextJobs = null; + }); + } + const fetchedMax = await nextJobs; + const job = jobQueue.pop(); + if (job) { + return job; + } else if (fetchedMax || myFetchId > getJobBaseline) { + // Either we fetched as many jobs as we could and there still weren't + // enough, or we requested a job after the request for jobs was sent to + // the database. Either way, let's fetch again. + return batchGetJob(myFetchId); + } else { + return undefined; + } + }; const getJob: GetJobFunction = async (workerId, flagsToSkip) => { - if (flagsToSkip !== null || getJobBatchSize <= 1) { + if (flagsToSkip !== null || getJobBatchSize < 1) { const jobs = await baseGetJob( compiledSharedOptions, withPgClient, @@ -854,37 +888,10 @@ export function _runTaskList( return jobs[0]; } else { const job = jobQueue.pop(); - if (job) { - // Queue already has a job, run that + if (job !== undefined) { return job; } else { - if (!nextJobs) { - // Queue is empty, no fetch of jobs in progress; let's fetch them - nextJobs = (async () => { - const jobs = await baseGetJob( - compiledSharedOptions, - withPgClient, - tasks, - workerId, - flagsToSkip, - getJobBatchSize, - ); - jobQueue = jobs.reverse(); - // Return true if we fetched the full batch size - const fetchAgain = jobs.length >= getJobBatchSize; - return fetchAgain; - })(); - } - /** If true, the full batch size was fetched, so if the queue is exhausted again it's likely that there will be more jobs */ - const fetchAgain = await nextJobs; - const job = jobQueue.pop(); - if (job) { - return job; - } else if (fetchAgain) { - return getJob(workerId, flagsToSkip); - } else { - return undefined; - } + return batchGetJob(++getJobCounter); } } }; diff --git a/src/sql/getJob.ts b/src/sql/getJob.ts index eb3bb15b..73d174a9 100644 --- a/src/sql/getJob.ts +++ b/src/sql/getJob.ts @@ -192,7 +192,7 @@ with j as ( name, }), ); - return rows.reverse().map((jobRow) => + return rows.map((jobRow) => Object.assign(jobRow, { task_identifier: taskDetails.supportedTaskIdentifierByTaskId[jobRow.task_id], From ec3893543241c362114af2a11ae929ef0d943808 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 07:31:09 +0100 Subject: [PATCH 09/20] Fix getJob call to reflect changes in #469 --- src/main.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main.ts b/src/main.ts index efd052b6..8192bedf 100644 --- a/src/main.ts +++ b/src/main.ts @@ -852,7 +852,7 @@ export function _runTaskList( compiledSharedOptions, withPgClient, tasks, - workerPool.id, // << NOTE: This is the worker pool id, not the worker id! + workerPool.id, null, getJobBatchSize, ); @@ -875,13 +875,13 @@ export function _runTaskList( return undefined; } }; - const getJob: GetJobFunction = async (workerId, flagsToSkip) => { + const getJob: GetJobFunction = async (_workerId, flagsToSkip) => { if (flagsToSkip !== null || getJobBatchSize < 1) { const jobs = await baseGetJob( compiledSharedOptions, withPgClient, tasks, - workerId, + workerPool.id, flagsToSkip, 1, ); From 2b6002813ec3a73304476a3b83a5a374600685b5 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 07:56:49 +0100 Subject: [PATCH 10/20] Hoist completeJob and failJob --- src/interfaces.ts | 7 +++++++ src/main.ts | 28 +++++++++++++++++++++++++++- src/worker.ts | 18 +++++++----------- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/src/interfaces.ts b/src/interfaces.ts index 3bce2307..e092541b 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -1161,3 +1161,10 @@ export type GetJobFunction = ( workerId: string, flagsToSkip: string[] | null, ) => Promise; + +export type CompleteJobFunction = (job: DbJob) => Promise; +export type FailJobFunction = ( + job: DbJob, + message: string, + replacementPayload: undefined | unknown[], +) => Promise; diff --git a/src/main.ts b/src/main.ts index 8192bedf..8e0aaf72 100644 --- a/src/main.ts +++ b/src/main.ts @@ -9,7 +9,9 @@ import { makeWithPgClientFromPool, } from "./helpers"; import { + CompleteJobFunction, EnhancedWithPgClient, + FailJobFunction, GetJobFunction, Job, RunOnceOptions, @@ -27,7 +29,8 @@ import { } from "./lib"; import { Logger } from "./logger"; import SIGNALS, { Signal } from "./signals"; -import { failJobs } from "./sql/failJob"; +import { completeJob as baseCompleteJob } from "./sql/completeJob"; +import { failJob as baseFailJob, failJobs } from "./sql/failJob"; import { getJob as baseGetJob } from "./sql/getJob"; import { resetLockedAt } from "./sql/resetLockedAt"; import { makeNewWorker } from "./worker"; @@ -895,6 +898,27 @@ export function _runTaskList( } } }; + + const completeJob: CompleteJobFunction = async (job) => { + return baseCompleteJob( + compiledSharedOptions, + withPgClient, + workerPool.id, + job, + ); + }; + + const failJob: FailJobFunction = async (job, message, replacementPayload) => { + return baseFailJob( + compiledSharedOptions, + withPgClient, + workerPool.id, + job, + message, + replacementPayload, + ); + }; + for (let i = 0; i < concurrency; i++) { const worker = makeNewWorker(compiledSharedOptions, { tasks, @@ -905,6 +929,8 @@ export function _runTaskList( autostart, workerId, getJob, + completeJob, + failJob, }); workerPool._workers.push(worker); const remove = () => { diff --git a/src/worker.ts b/src/worker.ts index 7377d4e4..9fb3d00c 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -4,7 +4,9 @@ import { randomBytes } from "crypto"; import deferred from "./deferred"; import { makeJobHelpers } from "./helpers"; import { + CompleteJobFunction, EnhancedWithPgClient, + FailJobFunction, GetJobFunction, Job, PromiseOrDirect, @@ -14,8 +16,6 @@ import { WorkerSharedOptions, } from "./interfaces"; import { CompiledSharedOptions } from "./lib"; -import { completeJob } from "./sql/completeJob"; -import { failJob } from "./sql/failJob"; export function makeNewWorker( compiledSharedOptions: CompiledSharedOptions, @@ -28,6 +28,8 @@ export function makeNewWorker( autostart?: boolean; workerId?: string; getJob: GetJobFunction; + completeJob: CompleteJobFunction; + failJob: FailJobFunction; }, ): Worker { const { @@ -39,6 +41,8 @@ export function makeNewWorker( autostart = true, workerId = `worker-${randomBytes(9).toString("hex")}`, getJob, + completeJob, + failJob, } = params; const { events, @@ -337,9 +341,6 @@ export function makeNewWorker( { failure: true, job, error: err, duration }, ); await failJob( - compiledSharedOptions, - withPgClient, - workerPool.id, job, message, // "Batch jobs": copy through only the unsuccessful parts of the payload @@ -368,12 +369,7 @@ export function makeNewWorker( ); } - await completeJob( - compiledSharedOptions, - withPgClient, - workerPool.id, - job, - ); + await completeJob(job); } events.emit("job:complete", { worker, job, error: err }); } catch (fatalError) { From 2695b5096b6216224fb110f72331ee42fbeaae36 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 08:55:53 +0100 Subject: [PATCH 11/20] Refactor failJob/completeJob in preparation for batching --- src/main.ts | 54 +++++++++++++++++++++++------------ src/sql/completeJob.ts | 25 +++++++++++----- src/sql/failJob.ts | 65 ++++++++++++++++++++++++++++-------------- 3 files changed, 96 insertions(+), 48 deletions(-) diff --git a/src/main.ts b/src/main.ts index 8e0aaf72..4b2af2e6 100644 --- a/src/main.ts +++ b/src/main.ts @@ -899,25 +899,41 @@ export function _runTaskList( } }; - const completeJob: CompleteJobFunction = async (job) => { - return baseCompleteJob( - compiledSharedOptions, - withPgClient, - workerPool.id, - job, - ); - }; - - const failJob: FailJobFunction = async (job, message, replacementPayload) => { - return baseFailJob( - compiledSharedOptions, - withPgClient, - workerPool.id, - job, - message, - replacementPayload, - ); - }; + const completeJob: CompleteJobFunction = + completeJobBatchDelay >= 0 + ? async (job) => { + return baseCompleteJob( + compiledSharedOptions, + withPgClient, + workerPool.id, + [job], + ); + } + : (job) => + baseCompleteJob(compiledSharedOptions, withPgClient, workerPool.id, [ + job, + ]); + + const failJob: FailJobFunction = + failJobBatchDelay >= 0 + ? async (job, message, replacementPayload) => { + return baseFailJob( + compiledSharedOptions, + withPgClient, + workerPool.id, + [ + { + job, + message, + replacementPayload, + }, + ], + ); + } + : (job, message, replacementPayload) => + baseFailJob(compiledSharedOptions, withPgClient, workerPool.id, [ + { job, message, replacementPayload }, + ]); for (let i = 0; i < concurrency; i++) { const worker = makeNewWorker(compiledSharedOptions, { diff --git a/src/sql/completeJob.ts b/src/sql/completeJob.ts index 404235b6..f64a827b 100644 --- a/src/sql/completeJob.ts +++ b/src/sql/completeJob.ts @@ -5,7 +5,7 @@ export async function completeJob( compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, poolId: string, - job: DbJob, + jobs: ReadonlyArray, ): Promise { const { escapedWorkerSchema, @@ -15,33 +15,44 @@ export async function completeJob( }, } = compiledSharedOptions; + const jobsWithQueues: DbJob[] = []; + const jobsWithoutQueues: DbJob[] = []; + for (const job of jobs) { + if (job.job_queue_id != null) { + jobsWithQueues.push(job); + } else { + jobsWithoutQueues.push(job); + } + } + // TODO: retry logic, in case of server connection interruption - if (job.job_queue_id != null) { + if (jobsWithQueues.length > 0) { await withPgClient.withRetries((client) => client.query({ text: `\ with j as ( delete from ${escapedWorkerSchema}._private_jobs as jobs -where id = $1::bigint +where id = ANY($1::bigint[]) returning * ) update ${escapedWorkerSchema}._private_job_queues as job_queues set locked_by = null, locked_at = null from j where job_queues.id = j.job_queue_id and job_queues.locked_by = $2::text;`, - values: [job.id, poolId], + values: [jobsWithQueues.map((j) => j.id), poolId], name: !preparedStatements ? undefined : `complete_job_q/${workerSchema}`, }), ); - } else { + } + if (jobsWithoutQueues.length > 0) { await withPgClient.withRetries((client) => client.query({ text: `\ delete from ${escapedWorkerSchema}._private_jobs as jobs -where id = $1::bigint`, - values: [job.id], +where id = ANY($1::bigint[])`, + values: [jobsWithoutQueues.map((j) => j.id)], name: !preparedStatements ? undefined : `complete_job/${workerSchema}`, }), ); diff --git a/src/sql/failJob.ts b/src/sql/failJob.ts index 63a08884..a86db5e5 100644 --- a/src/sql/failJob.ts +++ b/src/sql/failJob.ts @@ -1,13 +1,16 @@ import { DbJob, EnhancedWithPgClient } from "../interfaces"; import { CompiledSharedOptions } from "../lib"; +interface Spec { + job: DbJob; + message: string; + replacementPayload: undefined | unknown[]; +} export async function failJob( compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, poolId: string, - job: DbJob, - message: string, - replacementPayload: undefined | unknown[], + specs: ReadonlyArray, ): Promise { const { escapedWorkerSchema, @@ -17,56 +20,74 @@ export async function failJob( }, } = compiledSharedOptions; + const specsWithQueues: Spec[] = []; + const specsWithoutQueues: Spec[] = []; + + for (const spec of specs) { + if (spec.job.job_queue_id != null) { + specsWithQueues.push(spec); + } else { + specsWithoutQueues.push(spec); + } + } + // TODO: retry logic, in case of server connection interruption - if (job.job_queue_id != null) { + if (specsWithQueues.length > 0) { await withPgClient.withRetries((client) => client.query({ text: `\ with j as ( update ${escapedWorkerSchema}._private_jobs as jobs set -last_error = $2::text, +last_error = (el->>'message'), run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'), locked_by = null, locked_at = null, -payload = coalesce($4::json, jobs.payload) -where id = $1::bigint and locked_by = $3::text +payload = coalesce(el->'payload', jobs.payload) +from json_array_elements($2::json) as els(el) +where id = (el->>'jobId')::bigint and locked_by = $1::text returning * ) update ${escapedWorkerSchema}._private_job_queues as job_queues set locked_by = null, locked_at = null from j -where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text;`, +where job_queues.id = j.job_queue_id and job_queues.locked_by = $1::text;`, values: [ - job.id, - message, poolId, - replacementPayload != null - ? JSON.stringify(replacementPayload) - : null, + JSON.stringify( + specsWithQueues.map(({ job, message, replacementPayload }) => ({ + jobId: job.id, + message, + payload: replacementPayload, + })), + ), ], name: !preparedStatements ? undefined : `fail_job_q/${workerSchema}`, }), ); - } else { + } + if (specsWithoutQueues.length > 0) { await withPgClient.withRetries((client) => client.query({ text: `\ update ${escapedWorkerSchema}._private_jobs as jobs set -last_error = $2::text, +last_error = (el->>'message'), run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'), locked_by = null, locked_at = null, -payload = coalesce($4::json, jobs.payload) -where id = $1::bigint and locked_by = $3::text;`, +payload = coalesce(el->'payload', jobs.payload) +from json_array_elements($2::json) as els(el) +where id = (el->>'jobId')::bigint and locked_by = $1::text;`, values: [ - job.id, - message, poolId, - replacementPayload != null - ? JSON.stringify(replacementPayload) - : null, + JSON.stringify( + specsWithoutQueues.map(({ job, message, replacementPayload }) => ({ + jobId: job.id, + message, + payload: replacementPayload, + })), + ), ], name: !preparedStatements ? undefined : `fail_job/${workerSchema}`, }), From 63bbceb0cfae109cdc5d2ffe4e5d380a921aa3ad Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 09:04:02 +0100 Subject: [PATCH 12/20] Stub batch function --- src/main.ts | 59 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/src/main.ts b/src/main.ts index 4b2af2e6..33d2f1ee 100644 --- a/src/main.ts +++ b/src/main.ts @@ -901,14 +901,17 @@ export function _runTaskList( const completeJob: CompleteJobFunction = completeJobBatchDelay >= 0 - ? async (job) => { - return baseCompleteJob( - compiledSharedOptions, - withPgClient, - workerPool.id, - [job], - ); - } + ? batch( + completeJobBatchDelay, + (job) => job, + (jobs) => + baseCompleteJob( + compiledSharedOptions, + withPgClient, + workerPool.id, + jobs, + ), + ) : (job) => baseCompleteJob(compiledSharedOptions, withPgClient, workerPool.id, [ job, @@ -916,20 +919,21 @@ export function _runTaskList( const failJob: FailJobFunction = failJobBatchDelay >= 0 - ? async (job, message, replacementPayload) => { - return baseFailJob( - compiledSharedOptions, - withPgClient, - workerPool.id, - [ - { - job, - message, - replacementPayload, - }, - ], - ); - } + ? batch( + failJobBatchDelay, + (job, message, replacementPayload) => ({ + job, + message, + replacementPayload, + }), + (specs) => + baseFailJob( + compiledSharedOptions, + withPgClient, + workerPool.id, + specs, + ), + ) : (job, message, replacementPayload) => baseFailJob(compiledSharedOptions, withPgClient, workerPool.id, [ { job, message, replacementPayload }, @@ -1012,3 +1016,14 @@ export const runTaskListOnce = ( return pool; }; + +function batch( + delay: number, + makeSpec: (...args: TArgs) => TSpec, + callback: (specs: ReadonlyArray) => Promise, +): (...args: TArgs) => Promise { + return (...args) => { + const spec = makeSpec(...args); + return callback([spec]); + }; +} From daa110e09f8a9597632f3e0f000258933e2082cf Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 09:09:04 +0100 Subject: [PATCH 13/20] Implement batching function --- src/main.ts | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/main.ts b/src/main.ts index 33d2f1ee..fa63d34e 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1022,8 +1022,25 @@ function batch( makeSpec: (...args: TArgs) => TSpec, callback: (specs: ReadonlyArray) => Promise, ): (...args: TArgs) => Promise { + let currentBatch: { specs: TSpec[]; promise: Promise } | null = null; return (...args) => { const spec = makeSpec(...args); - return callback([spec]); + if (currentBatch) { + currentBatch.specs.push(spec); + } else { + const specs = [spec]; + currentBatch = { + specs, + promise: (async () => { + await sleep(delay); + currentBatch = null; + return callback(specs); + })(), + }; + } + return currentBatch.promise; }; } + +const sleep = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); From 454d22083df58ceca2ef35d108f30158e7adfd91 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 09:34:51 +0100 Subject: [PATCH 14/20] Lint fix --- src/main.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.ts b/src/main.ts index fa63d34e..6b1ea5eb 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1017,7 +1017,7 @@ export const runTaskListOnce = ( return pool; }; -function batch( +function batch( delay: number, makeSpec: (...args: TArgs) => TSpec, callback: (specs: ReadonlyArray) => Promise, From 1e818db4f39730314a1f6f6574a8886e55829abc Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 10:36:16 +0100 Subject: [PATCH 15/20] Tweak graceful/forceful shutdown handover --- src/interfaces.ts | 2 ++ src/main.ts | 32 +++++++++++++++++++++++++++----- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/interfaces.ts b/src/interfaces.ts index e092541b..bb709f2b 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -445,6 +445,8 @@ export interface WorkerPool { /** @internal */ _shuttingDown: boolean; /** @internal */ + _forcefulShuttingDown: boolean; + /** @internal */ _active: boolean; /** @internal */ _workers: Worker[]; diff --git a/src/main.ts b/src/main.ts index 6b1ea5eb..1cb4cf95 100644 --- a/src/main.ts +++ b/src/main.ts @@ -586,9 +586,13 @@ export function _runTaskList( unregisterSignalHandlers(); } } else { - logger.error( - `Graphile Worker internal error: terminate() was called twice for worker pool. Ignoring second call; but this indicates a bug - please file an issue.`, - ); + try { + throw new Error( + `Graphile Worker internal error: terminate() was called twice for worker pool. Ignoring second call; but this indicates a bug - please file an issue.`, + ); + } catch (e) { + logger.error(String(e.stack)); + } } } @@ -601,6 +605,7 @@ export function _runTaskList( id: `${continuous ? "pool" : "otpool"}-${randomBytes(9).toString("hex")}`, _active: true, _shuttingDown: false, + _forcefulShuttingDown: false, _workers: [], _withPgClient: withPgClient, get worker() { @@ -621,6 +626,12 @@ export function _runTaskList( async gracefulShutdown( message = "Worker pool is shutting down gracefully", ) { + if (workerPool._forcefulShuttingDown) { + logger.error( + `gracefulShutdown called when forcefulShutdown is already in progress`, + ); + return; + } if (workerPool._shuttingDown) { logger.error( `gracefulShutdown called when gracefulShutdown is already in progress`, @@ -720,13 +731,22 @@ export function _runTaskList( }); return this.forcefulShutdown(e.message); } - terminate(); + if (!terminated) { + terminate(); + } }, /** * Stop accepting jobs and "fail" all currently running jobs. */ async forcefulShutdown(message: string) { + if (workerPool._forcefulShuttingDown) { + logger.error( + `forcefulShutdown called when forcefulShutdown is already in progress`, + ); + return; + } + workerPool._forcefulShuttingDown = true; events.emit("pool:forcefulShutdown", { pool: workerPool, workerPool, @@ -795,7 +815,9 @@ export function _runTaskList( error: e, }); } - terminate(); + if (!terminated) { + terminate(); + } }, promise, From 22e8b19ab9782a9c5b4d041d09a4339c9e418129 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 10:36:36 +0100 Subject: [PATCH 16/20] Evaluate envvar just once up top --- src/worker.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/worker.ts b/src/worker.ts index 9fb3d00c..a99c74c0 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -17,6 +17,8 @@ import { } from "./interfaces"; import { CompiledSharedOptions } from "./lib"; +const NO_LOG_SUCCESS = !!process.env.NO_LOG_SUCCESS; + export function makeNewWorker( compiledSharedOptions: CompiledSharedOptions, params: { @@ -356,7 +358,7 @@ export function makeNewWorker( "Error occurred in event emitter for 'job:success'; this is an issue in your application code and you should fix it", ); } - if (!process.env.NO_LOG_SUCCESS) { + if (!NO_LOG_SUCCESS) { logger.info( `Completed task ${job.id} (${ job.task_identifier From 9d9207ef347ebde69061de370b1115d186b97b1d Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 13:07:28 +0100 Subject: [PATCH 17/20] Release batches before releasing worker pool --- src/interfaces.ts | 13 +++- src/main.ts | 167 +++++++++++++++++++++++++++++++++++++--------- src/worker.ts | 4 +- 3 files changed, 148 insertions(+), 36 deletions(-) diff --git a/src/interfaces.ts b/src/interfaces.ts index bb709f2b..609f0777 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -853,6 +853,15 @@ export type WorkerEventMap = { client: PoolClient; }; + /** + * When a worker pool fails to complete/fail a job + */ + "pool:fatalError": { + workerPool: WorkerPool; + error: unknown; + action: string; + }; + /** * When a worker pool is released */ @@ -1164,9 +1173,9 @@ export type GetJobFunction = ( flagsToSkip: string[] | null, ) => Promise; -export type CompleteJobFunction = (job: DbJob) => Promise; +export type CompleteJobFunction = (job: DbJob) => void; export type FailJobFunction = ( job: DbJob, message: string, replacementPayload: undefined | unknown[], -) => Promise; +) => void; diff --git a/src/main.ts b/src/main.ts index 1cb4cf95..970ab915 100644 --- a/src/main.ts +++ b/src/main.ts @@ -568,9 +568,35 @@ export function _runTaskList( const promise = deferred(); - function deactivate() { + async function deactivate() { if (workerPool._active) { workerPool._active = false; + // TODO: stop the batch()es and await the promises here + const releaseCompleteJobPromise = releaseCompleteJob?.(); + const releaseFailJobPromise = releaseFailJob?.(); + const [releaseCompleteJobResult, releaseFailJobResult] = + await Promise.allSettled([ + releaseCompleteJobPromise, + releaseFailJobPromise, + ]); + if (releaseCompleteJobResult.status === "rejected") { + // Log but continue regardless + logger.error( + `Releasing complete job batcher failed: ${releaseCompleteJobResult.reason}`, + { + error: releaseCompleteJobResult.reason, + }, + ); + } + if (releaseFailJobResult.status === "rejected") { + // Log but continue regardless + logger.error( + `Releasing failed job batcher failed: ${releaseFailJobResult.reason}`, + { + error: releaseFailJobResult.reason, + }, + ); + } return onDeactivate?.(); } } @@ -921,7 +947,7 @@ export function _runTaskList( } }; - const completeJob: CompleteJobFunction = + const { release: releaseCompleteJob, fn: completeJob } = ( completeJobBatchDelay >= 0 ? batch( completeJobBatchDelay, @@ -933,13 +959,34 @@ export function _runTaskList( workerPool.id, jobs, ), + (error, jobs) => { + events.emit("pool:fatalError", { + error, + workerPool, + action: "completeJob", + }); + logger.error( + `Failed to complete jobs '${jobs + .map((j) => j.id) + .join("', '")}':\n${String(error)}`, + { fatalError: error, jobs }, + ); + workerPool.gracefulShutdown(); + }, ) - : (job) => - baseCompleteJob(compiledSharedOptions, withPgClient, workerPool.id, [ - job, - ]); + : { + release: null, + fn: (job) => + baseCompleteJob( + compiledSharedOptions, + withPgClient, + workerPool.id, + [job], + ), + } + ) as { release: (() => void) | null; fn: CompleteJobFunction }; - const failJob: FailJobFunction = + const { release: releaseFailJob, fn: failJob } = ( failJobBatchDelay >= 0 ? batch( failJobBatchDelay, @@ -955,11 +1002,29 @@ export function _runTaskList( workerPool.id, specs, ), + (error, specs) => { + events.emit("pool:fatalError", { + error, + workerPool, + action: "failJob", + }); + logger.error( + `Failed to fail jobs '${specs + .map((spec) => spec.job.id) + .join("', '")}':\n${String(error)}`, + { fatalError: error, specs }, + ); + workerPool.gracefulShutdown(); + }, ) - : (job, message, replacementPayload) => - baseFailJob(compiledSharedOptions, withPgClient, workerPool.id, [ - { job, message, replacementPayload }, - ]); + : { + release: null, + fn: (job, message, replacementPayload) => + baseFailJob(compiledSharedOptions, withPgClient, workerPool.id, [ + { job, message, replacementPayload }, + ]), + } + ) as { release: (() => void) | null; fn: FailJobFunction }; for (let i = 0; i < concurrency; i++) { const worker = makeNewWorker(compiledSharedOptions, { @@ -983,8 +1048,7 @@ export function _runTaskList( } workerPool._workers.splice(workerPool._workers.indexOf(worker), 1); if (!continuous && workerPool._workers.length === 0) { - deactivate(); - terminate(); + deactivate().then(terminate, terminate); } }; worker.promise.then( @@ -1043,26 +1107,65 @@ function batch( delay: number, makeSpec: (...args: TArgs) => TSpec, callback: (specs: ReadonlyArray) => Promise, -): (...args: TArgs) => Promise { - let currentBatch: { specs: TSpec[]; promise: Promise } | null = null; - return (...args) => { - const spec = makeSpec(...args); - if (currentBatch) { - currentBatch.specs.push(spec); - } else { - const specs = [spec]; - currentBatch = { - specs, - promise: (async () => { - await sleep(delay); - currentBatch = null; - return callback(specs); - })(), - }; - } - return currentBatch.promise; + errorHandler: ( + error: unknown, + specs: ReadonlyArray, + ) => void | Promise, +): { + release(): void | Promise; + fn: (...args: TArgs) => void; +} { + let pending = 0; + let releasing = false; + let released = false; + const promise = deferred(); + let currentBatch: { specs: TSpec[]; promise: Promise } | null = null; + return { + async release() { + if (releasing) { + return; + } + releasing = true; + if (pending === 0) { + released = true; + promise.resolve(); + } + await promise; + }, + fn(...args) { + if (released) { + throw new Error( + "This batcher has been released, and so no more calls can be made.", + ); + } + const spec = makeSpec(...args); + if (currentBatch) { + currentBatch.specs.push(spec); + } else { + const specs = [spec]; + currentBatch = { + specs, + promise: (async () => { + pending++; + try { + await sleep(delay); + currentBatch = null; + await callback(specs); + } catch (error) { + errorHandler(error, specs); + } finally { + pending--; + if (pending === 0 && releasing) { + released = true; + promise.resolve(); + } + } + })(), + }; + } + return; + }, }; } - const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); diff --git a/src/worker.ts b/src/worker.ts index a99c74c0..c67a5d82 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -342,7 +342,7 @@ export function makeNewWorker( }`, { failure: true, job, error: err, duration }, ); - await failJob( + failJob( job, message, // "Batch jobs": copy through only the unsuccessful parts of the payload @@ -371,7 +371,7 @@ export function makeNewWorker( ); } - await completeJob(job); + completeJob(job); } events.emit("job:complete", { worker, job, error: err }); } catch (fatalError) { From bf103b99a75e48de6e2940c4a4f6a9a7077ee0ee Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 13:12:35 +0100 Subject: [PATCH 18/20] Tweak perfTest for more logging --- perfTest/run.js | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/perfTest/run.js b/perfTest/run.js index e0c3a708..9e78a6d5 100755 --- a/perfTest/run.js +++ b/perfTest/run.js @@ -50,9 +50,11 @@ async function main() { console.log(); console.log(); console.log("Timing startup/shutdown time..."); - const startupTime = await time(() => { - execSync("node ../dist/cli.js --once", execOptions); + let result; + const startupTime = await time(async () => { + result = await exec(`node ../dist/cli.js --once`, execOptions); }); + logResult(result); console.log(); if (STUCK_JOB_COUNT > 0) { @@ -83,17 +85,7 @@ async function main() { ), ); } - (await Promise.all(promises)).map(({ error, stdout, stderr }) => { - if (error) { - throw error; - } - if (stdout) { - console.log(stdout); - } - if (stderr) { - console.error(stderr); - } - }); + (await Promise.all(promises)).map(logResult); }); console.log( `Jobs per second: ${((1000 * JOB_COUNT) / (dur - startupTime)).toFixed(2)}`, @@ -112,3 +104,15 @@ main().catch((e) => { console.error(e); process.exit(1); }); + +function logResult({ error, stdout, stderr }) { + if (error) { + throw error; + } + if (stdout) { + console.log(stdout); + } + if (stderr) { + console.error(stderr); + } +} From 8fb0174d8fe2bead7780abf0117ac60cdfd5adee Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 14:31:54 +0100 Subject: [PATCH 19/20] Fairer startup time calc --- perfTest/run.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/perfTest/run.js b/perfTest/run.js index 9e78a6d5..4bec116b 100755 --- a/perfTest/run.js +++ b/perfTest/run.js @@ -52,7 +52,12 @@ async function main() { console.log("Timing startup/shutdown time..."); let result; const startupTime = await time(async () => { - result = await exec(`node ../dist/cli.js --once`, execOptions); + result = await exec( + `node ../dist/cli.js --once --once -j ${CONCURRENCY} -m ${ + CONCURRENCY + 1 + }`, + execOptions, + ); }); logResult(result); console.log(); From f46ec8d5651dbdaea111c2a121b8f988599c04f4 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Thu, 6 Jun 2024 15:50:21 +0100 Subject: [PATCH 20/20] Refactor --- src/main.ts | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/main.ts b/src/main.ts index 970ab915..ccecc603 100644 --- a/src/main.ts +++ b/src/main.ts @@ -899,19 +899,21 @@ export function _runTaskList( // Queue is empty, no fetch of jobs in progress; let's fetch them. getJobBaseline = getJobCounter; nextJobs = (async () => { - const jobs = await baseGetJob( - compiledSharedOptions, - withPgClient, - tasks, - workerPool.id, - null, - getJobBatchSize, - ); - jobQueue = jobs.reverse(); - return jobs.length >= getJobBatchSize; - })().finally(() => { - nextJobs = null; - }); + try { + const jobs = await baseGetJob( + compiledSharedOptions, + withPgClient, + tasks, + workerPool.id, + null, + getJobBatchSize, + ); + jobQueue = jobs.reverse(); + return jobs.length >= getJobBatchSize; + } finally { + nextJobs = null; + } + })(); } const fetchedMax = await nextJobs; const job = jobQueue.pop();