Skip to content

Commit

Permalink
types and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Aug 11, 2024
1 parent 155ef19 commit 08c8d4b
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 35 deletions.
83 changes: 55 additions & 28 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
- [`sendAfter(name, data, options, value)`](#sendaftername-data-options-value)
- [`sendThrottled(name, data, options, seconds, key)`](#sendthrottledname-data-options-seconds-key)
- [`sendDebounced(name, data, options, seconds, key)`](#senddebouncedname-data-options-seconds-key)
- [`insert([jobs])`](#insertjobs)
- [`insert(Job[])`](#insertjob)
- [`fetch(name, options)`](#fetchname-options)
- [`work()`](#work)
- [`work(name, options, handler)`](#workname-options-handler)
- [`work(name, handler)`](#workname-handler)
- [`notifyWorker(id)`](#notifyworkerid)
- [`offWork(value)`](#offworkvalue)
- [`publish(event, data, options)`](#publishevent-data-options)
- [`subscribe(event, name)`](#subscribeevent-name)
Expand All @@ -51,11 +52,14 @@
- [`complete(name, [ids], options)`](#completename-ids-options)
- [`fail(name, id, data, options)`](#failname-id-data-options)
- [`fail(name, [ids], options)`](#failname-ids-options)
- [`notifyWorker(id)`](#notifyworkerid)
- [`getQueueSize(name, options)`](#getqueuesizename-options)
- [`getJobById(name, id, options)`](#getjobbyidname-id-options)
- [`createQueue(name, Queue)`](#createqueuename-queue)
- [`updateQueue(name, options)`](#updatequeuename-options)
- [`purgeQueue(name)`](#purgequeuename)
- [`deleteQueue(name)`](#deletequeuename)
- [`getQueues()`](#getqueues)
- [`getQueue(name)`](#getqueuename)
- [`getQueueSize(name, options)`](#getqueuesizename-options)
- [`clearStorage()`](#clearstorage)
- [`isInstalled()`](#isinstalled)
- [`schemaVersion()`](#schemaversion)
Expand Down Expand Up @@ -635,7 +639,7 @@ Like, `sendThrottled()`, but instead of rejecting if a job is already sent in th

This is a convenience version of `send()` with the `singletonSeconds`, `singletonKey` and `singletonNextSlot` option assigned. The `key` argument is optional.

## `insert([jobs])`
## `insert(Job[])`

Create multiple jobs in one request with an array of objects.

Expand All @@ -662,6 +666,8 @@ interface JobInsert<T = object> {

## `fetch(name, options)`

Returns an array of jobs from a queue

**Arguments**
- `name`: string
- `options`: object
Expand Down Expand Up @@ -703,8 +709,6 @@ interface JobInsert<T = object> {
}
```

**Resolves**
- `[job]`: array of jobs

**Notes**

Expand Down Expand Up @@ -804,6 +808,11 @@ await boss.work(queue, async ([ job ]) => {
})
```

## `notifyWorker(id)`

Notifies a worker by id to bypass the job polling interval (see `pollingIntervalSeconds`) for this iteration in the loop.


## `offWork(value)`

Removes a worker by name or id and stops polling.
Expand Down Expand Up @@ -936,31 +945,16 @@ The promise will resolve on a successful failure state assignment, or reject if

> See comments above on `cancel([ids])` regarding when the promise will resolve or reject because of a batch operation.
## `notifyWorker(id)`

Notifies a worker by id to bypass the job polling interval (see `pollingIntervalSeconds`) for this iteration in the loop.

## `getQueueSize(name, options)`

Returns the number of pending jobs in a queue by name.
## `getJobById(name, id, options)`

`options`: Optional, object.
Retrieves a job with all metadata by name and id

| Prop | Type | Description | Default |
| - | - | - | - |
|`before`| string | count jobs in states before this state | states.active |
**options**

As an example, the following options object include active jobs along with created and retry.
* `includeArchive`: bool, default: false

```js
{
before: states.completed
}
```

## `getJobById(name, id, options)`

Retrieves a job with all metadata by id in either the primary or archive storage.
If `true`, it will search for the job in the archive if not found in the primary job storage.

## `createQueue(name, Queue)`

Expand All @@ -984,15 +978,48 @@ Allowed policy values:
| Policy | Description |
| - | - |
| standard | (Default) Supports all standard features such as deferral, priority, and throttling |
| debounced | All standard features, but only allows 1 job to be queued, unlimited active. Can be extended with `singletonKey` |
| short | All standard features, but only allows 1 job to be queued, unlimited active. Can be extended with `singletonKey` |
| singleton | All standard features, but only allows 1 job to be active, unlimited queued. Can be extended with `singletonKey` |
| stately | Combination of debounced and singleton: Only allows 1 job per state, queued and/or active. Can be extended with `singletonKey` |
| stately | Combination of short and singleton: Only allows 1 job per state, queued and/or active. Can be extended with `singletonKey` |
## `updateQueue(name, options)`
Updates options on an existing queue. The policy can be changed, but understand this won't impact existing jobs in flight and will only apply the new policy on new incoming jobs.
## `purgeQueue(name)`
Deletes all queued jobs in a queue.
## `deleteQueue(name)`
Deletes a queue and all jobs from the active job table. Any jobs in the archive table are retained.
## `getQueues()`
Returns all queues
## `getQueue(name)`
Returns a queue by name
## `getQueueSize(name, options)`
Returns the number of pending jobs in a queue by name.
`options`: Optional, object.
| Prop | Type | Description | Default |
| - | - | - | - |
|`before`| string | count jobs in states before this state | states.active |
As an example, the following options object include active jobs along with created and retry.
```js
{
before: states.completed
}
```
## `clearStorage()`
Utility function if and when needed to clear all job and archive storage tables. Internally, this issues a `TRUNCATE` command.
Expand Down
2 changes: 1 addition & 1 deletion src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ function getQueues (schema) {
dead_letter as "deadLetter",
created_on as "createdOn",
updated_on as "updatedOn"
FROM ${schema}.queue
FROM ${schema}.queue
`
}

Expand Down
3 changes: 3 additions & 0 deletions test/queueTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,16 @@ describe('queues', function () {

let queueObj = await boss.getQueue(queue)

assert.strictEqual(queue, queueObj.name)
assert.strictEqual(createProps.policy, queueObj.policy)
assert.strictEqual(createProps.retryLimit, queueObj.retryLimit)
assert.strictEqual(createProps.retryBackoff, queueObj.retryBackoff)
assert.strictEqual(createProps.retryDelay, queueObj.retryDelay)
assert.strictEqual(createProps.expireInSeconds, queueObj.expireInSeconds)
assert.strictEqual(createProps.retentionMinutes, queueObj.retentionMinutes)
assert.strictEqual(createProps.deadLetter, queueObj.deadLetter)
assert(queueObj.createdOn)
assert(queueObj.updatedOn)

deadLetter = `${queue}_dlq2`
await boss.createQueue(deadLetter)
Expand Down
1 change: 0 additions & 1 deletion test/scheduleTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ describe('schedule', function () {
await boss.schedule(queue, '* * * * *')
assert(false)
} catch (err) {
console.log(err)
assert(true)
}
})
Expand Down
2 changes: 1 addition & 1 deletion test/testHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async function start (options) {
try {
options = getConfig(options)
const boss = new PgBoss(options)
boss.on('error', err => console.log({ schema: options.schema, message: err.message }))
// boss.on('error', err => console.log({ schema: options.schema, message: err.message }))
await boss.start()
// auto-create queue for tests
if (!options.noDefault) {
Expand Down
8 changes: 4 additions & 4 deletions types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,15 @@ declare class PgBoss extends EventEmitter {
fail(name: string, id: string, data: object, options?: PgBoss.ConnectionOptions): Promise<void>;
fail(name: string, ids: string[], options?: PgBoss.ConnectionOptions): Promise<void>;

getQueueSize(name: string, options?: object): Promise<number>;
getJobById(name: string, id: string, options?: PgBoss.ConnectionOptions & { includeArchive: bool }): Promise<PgBoss.JobWithMetadata | null>;

createQueue(name: string, options?: PgBoss.Queue): Promise<void>;
getQueue(name: string): Promise<PgBoss.Queue | null>;
getQueues(): Promise<PgBoss.QueueResult[]>;
updateQueue(name: string, options?: PgBoss.Queue): Promise<void>;
deleteQueue(name: string): Promise<void>;
purgeQueue(name: string): Promise<void>;
getQueues(): Promise<PgBoss.QueueResult[]>;
getQueue(name: string): Promise<PgBoss.QueueResult | null>;
getQueueSize(name: string, options?: { before: 'retry' | 'active' | 'completed' | 'cancelled' | 'failed' }): Promise<number>;

clearStorage(): Promise<void>;
archive(): Promise<void>;
Expand Down

0 comments on commit 08c8d4b

Please sign in to comment.