From 463f2fc892895b7d0c14e91574c66e4e565b1638 Mon Sep 17 00:00:00 2001 From: Tim Jones Date: Sat, 12 Oct 2024 17:39:59 -0500 Subject: [PATCH] 511 - add create_queue db migration (#513) * add missing migration for create_queue function * versioning --- package-lock.json | 4 +-- package.json | 5 ++-- src/migrationStore.js | 66 +++++++++++++++++++++++++++++++++++++++++++ src/plans.js | 2 +- version.json | 2 +- 5 files changed, 73 insertions(+), 6 deletions(-) diff --git a/package-lock.json b/package-lock.json index 1444249..51348f3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "pg-boss", - "version": "10.1.4", + "version": "10.1.5", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "pg-boss", - "version": "10.1.4", + "version": "10.1.5", "license": "MIT", "dependencies": { "cron-parser": "^4.9.0", diff --git a/package.json b/package.json index ea52a56..09a767c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "pg-boss", - "version": "10.1.4", + "version": "10.1.5", "description": "Queueing jobs in Postgres from Node.js like a boss", "main": "./src/index.js", "engines": { @@ -23,7 +23,8 @@ "cover": "nyc npm test", "tsc": "tsc --noEmit types.d.ts", "readme": "node ./test/readme.js", - "migrate": "node -e 'console.log(require(\"./src\").getMigrationPlans())'" + "db:migrate": "node -e 'console.log(require(\"./src\").getMigrationPlans())'", + "db:construct": "node -e 'console.log(require(\"./src\").getConstructionPlans())'" }, "mocha": { "timeout": 10000, diff --git a/src/migrationStore.js b/src/migrationStore.js index 251fd36..b2e4e2a 100644 --- a/src/migrationStore.js +++ b/src/migrationStore.js @@ -64,6 +64,72 @@ function migrate (value, version, migrations) { function getAll (schema) { return [ + { + release: '10.1.5', + version: 24, + previous: 23, + install: [ + ` + CREATE OR REPLACE FUNCTION ${schema}.create_queue(queue_name text, options json) + RETURNS VOID AS + $$ + DECLARE + table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex'); + queue_created_on timestamptz; + BEGIN + + WITH q as ( + INSERT INTO ${schema}.queue ( + name, + policy, + retry_limit, + retry_delay, + retry_backoff, + expire_seconds, + retention_minutes, + dead_letter, + partition_name + ) + VALUES ( + queue_name, + options->>'policy', + (options->>'retryLimit')::int, + (options->>'retryDelay')::int, + (options->>'retryBackoff')::bool, + (options->>'expireInSeconds')::int, + (options->>'retentionMinutes')::int, + options->>'deadLetter', + table_name + ) + ON CONFLICT DO NOTHING + RETURNING created_on + ) + SELECT created_on into queue_created_on from q; + + IF queue_created_on IS NULL THEN + RETURN; + END IF; + + EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name); + + EXECUTE format('ALTER TABLE ${schema}.%1$I ADD PRIMARY KEY (name, id)', table_name); + EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name); + EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i1 ON ${schema}.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''created'' AND policy = ''short''', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i2 ON ${schema}.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''active'' AND policy = ''singleton''', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i3 ON ${schema}.%1$I (name, state, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''stately''', table_name); + EXECUTE format('CREATE UNIQUE INDEX %1$s_i4 ON ${schema}.%1$I (name, singleton_on, COALESCE(singleton_key, '''')) WHERE state <> ''cancelled'' AND singleton_on IS NOT NULL', table_name); + EXECUTE format('CREATE INDEX %1$s_i5 ON ${schema}.%1$I (name, start_after) INCLUDE (priority, created_on, id) WHERE state < ''active''', table_name); + + EXECUTE format('ALTER TABLE ${schema}.%I ADD CONSTRAINT cjc CHECK (name=%L)', table_name, queue_name); + EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION ${schema}.%I FOR VALUES IN (%L)', table_name, queue_name); + END; + $$ + LANGUAGE plpgsql + ` + ], + uninstall: [] + }, { release: '10.1.1', version: 23, diff --git a/src/plans.js b/src/plans.js index 941dc0b..8489ff8 100644 --- a/src/plans.js +++ b/src/plans.js @@ -332,7 +332,7 @@ function createPrimaryKeyArchive (schema) { } function createIndexJobPolicyShort (schema) { - return `CREATE UNIQUE INDEX job_i1 ON ${schema}.job (name, COALESCE(singleton_key, '')) WHERE state = '${JOB_STATES.created}' AND policy = '${QUEUE_POLICIES.short}';` + return `CREATE UNIQUE INDEX job_i1 ON ${schema}.job (name, COALESCE(singleton_key, '')) WHERE state = '${JOB_STATES.created}' AND policy = '${QUEUE_POLICIES.short}'` } function createIndexJobPolicySingleton (schema) { diff --git a/version.json b/version.json index 2e6fc91..bbdad9b 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "schema": 23 + "schema": 24 }