-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdispatch-queue.js
More file actions
112 lines (100 loc) · 3.09 KB
/
Copy pathdispatch-queue.js
File metadata and controls
112 lines (100 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import { randomUUID } from 'crypto';
import { getDb } from './db.js';
import { sqliteNow } from './dispatcher-utils.js';
const VALID_DISPATCH_KINDS = new Set(['manual', 'chain', 'retry']);
const VALID_DISPATCH_STATUSES = new Set([
'pending',
'claimed',
'awaiting_approval',
'done',
'cancelled',
]);
function assertKind(kind) {
if (!VALID_DISPATCH_KINDS.has(kind)) {
throw new Error(`Invalid dispatch kind "${kind}". Valid: ${[...VALID_DISPATCH_KINDS].join(', ')}`);
}
}
function assertStatus(status) {
if (!VALID_DISPATCH_STATUSES.has(status)) {
throw new Error(`Invalid dispatch status "${status}". Valid: ${[...VALID_DISPATCH_STATUSES].join(', ')}`);
}
}
export function enqueueDispatch(jobId, opts = {}) {
const db = getDb();
const id = opts.id || randomUUID();
const kind = opts.kind || 'manual';
const status = opts.status || 'pending';
assertKind(kind);
assertStatus(status);
db.prepare(`
INSERT INTO job_dispatch_queue (
id, job_id, dispatch_kind, status, scheduled_for,
source_run_id, retry_of_run_id, created_at, claimed_at, processed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), ?, ?)
`).run(
id,
jobId,
kind,
status,
opts.scheduled_for || sqliteNow(-1000),
opts.source_run_id || null,
opts.retry_of_run_id || null,
opts.claimed_at || null,
opts.processed_at || null
);
return getDispatch(id);
}
export function getDispatch(id) {
return getDb().prepare('SELECT * FROM job_dispatch_queue WHERE id = ?').get(id) || null;
}
export function getDueDispatches(limit = 100) {
return getDb().prepare(`
SELECT q.*, j.name as job_name
FROM job_dispatch_queue q
JOIN jobs j ON q.job_id = j.id
WHERE q.status = 'pending'
AND q.scheduled_for <= datetime('now')
AND (j.enabled = 1 OR q.dispatch_kind = 'manual')
ORDER BY q.scheduled_for ASC, q.created_at ASC
LIMIT ?
`).all(limit);
}
export function claimDispatch(id) {
const result = getDb().prepare(`
UPDATE job_dispatch_queue
SET status = 'claimed',
claimed_at = datetime('now')
WHERE id = ? AND status = 'pending'
`).run(id);
return result.changes > 0 ? getDispatch(id) : null;
}
export function releaseDispatch(id, scheduledFor = null) {
const result = getDb().prepare(`
UPDATE job_dispatch_queue
SET status = 'pending',
scheduled_for = COALESCE(?, scheduled_for),
claimed_at = NULL
WHERE id = ? AND status IN ('claimed', 'awaiting_approval')
`).run(scheduledFor, id);
return result.changes > 0 ? getDispatch(id) : null;
}
export function setDispatchStatus(id, status) {
assertStatus(status);
const processedAt = ['done', 'cancelled'].includes(status) ? sqliteNow() : null;
getDb().prepare(`
UPDATE job_dispatch_queue
SET status = ?,
processed_at = COALESCE(?, processed_at)
WHERE id = ?
`).run(status, processedAt, id);
return getDispatch(id);
}
export function listDispatchesForJob(jobId, limit = 20) {
return getDb().prepare(`
SELECT *
FROM job_dispatch_queue
WHERE job_id = ?
ORDER BY created_at DESC
LIMIT ?
`).all(jobId, limit);
}