diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 881a25ef9a..9cbf183714 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -15,7 +15,7 @@ import { RedisClient, WorkerOptions, } from '../interfaces'; -import { MinimalQueue } from '../types'; +import { MinimalQueue, Serialize } from '../types'; import { delay, DELAY_TIME_1, @@ -55,7 +55,7 @@ export interface WorkerListener< * * This event is triggered when a job enters the 'active' state. */ - active: (job: Job, prev: string) => void; + active: (job: Job, ResultType, NameType>, prev: string) => void; /** * Listen to 'closing' event. @@ -77,7 +77,7 @@ export interface WorkerListener< * This event is triggered when a job has successfully completed. */ completed: ( - job: Job, + job: Job, ResultType, NameType>, result: ResultType, prev: string, ) => void; @@ -106,7 +106,7 @@ export interface WorkerListener< * reaches the stalled limit and it is deleted by the removeOnFail option. */ failed: ( - job: Job | undefined, + job: Job, ResultType, NameType> | undefined, error: Error, prev: string, ) => void; @@ -127,7 +127,7 @@ export interface WorkerListener< * world. */ progress: ( - job: Job, + job: Job, ResultType, NameType>, progress: number | object, ) => void; @@ -171,7 +171,7 @@ export class Worker< private abortDelayController: AbortController | null = null; private asyncFifoQueue: AsyncFifoQueue, ResultType, NameType >>; @@ -187,7 +187,7 @@ export class Worker< private _repeat: Repeat; protected paused: Promise; - protected processFn: Processor; + protected processFn: Processor, ResultType, NameType>; protected running = false; static RateLimitError(): Error { @@ -196,7 +196,7 @@ export class Worker< constructor( name: string, - processor?: string | URL | null | Processor, + processor?: string | URL | null | Processor, ResultType, NameType>, opts?: WorkerOptions, Connection?: typeof RedisConnection, ) { @@ -289,7 +289,7 @@ export class Worker< useWorkerThreads: this.opts.useWorkerThreads, }); - this.processFn = sandbox( + this.processFn = sandbox, ResultType, NameType>( processor, this.childPool, ).bind(this); @@ -348,7 +348,7 @@ export class Worker< } protected callProcessJob( - job: Job, + job: Job, ResultType, NameType>, token: string, ): Promise { return this.processFn(job, token); @@ -357,9 +357,9 @@ export class Worker< protected createJob( data: JobJsonRaw, jobId: string, - ): Job { + ): Job, ResultType, NameType> { return this.Job.fromJSON(this as MinimalQueue, data, jobId) as Job< - DataType, + Serialize, ResultType, NameType >; @@ -423,7 +423,7 @@ export class Worker< this.startLockExtenderTimer(jobsInProgress); const asyncFifoQueue = (this.asyncFifoQueue = - new AsyncFifoQueue>()); + new AsyncFifoQueue, ResultType, NameType>>()); let tokenPostfix = 0; @@ -450,7 +450,7 @@ export class Worker< const token = `${this.id}:${tokenPostfix++}`; const fetchedJob = this.retryIfFailed, ResultType, NameType >>( @@ -484,7 +484,7 @@ export class Worker< // Since there can be undefined jobs in the queue (when a job fails or queue is empty) // we iterate until we find a job. - let job: Job | void; + let job: Job, ResultType, NameType> | void; do { job = await asyncFifoQueue.fetch(); } while (!job && asyncFifoQueue.numQueued() > 0); @@ -492,10 +492,10 @@ export class Worker< if (job) { const token = job.token; asyncFifoQueue.add( - this.retryIfFailed>( + this.retryIfFailed, ResultType, NameType>>( () => this.processJob( - >job, + , ResultType, NameType>>job, token, () => asyncFifoQueue.numTotal() <= this.opts.concurrency, jobsInProgress, @@ -533,7 +533,7 @@ export class Worker< bclient: RedisClient, token: string, { block = true }: GetNextJobOptions = {}, - ): Promise | undefined> { + ): Promise, ResultType, NameType> | undefined> { if (this.paused) { if (block) { await this.paused; @@ -608,7 +608,7 @@ will never work with more accuracy than 1ms. */ client: RedisClient, token: string, name?: string, - ): Promise> { + ): Promise, ResultType, NameType>> { const [jobData, id, limitUntil, delayUntil] = await this.scripts.moveToActive(client, token, name); this.updateDelays(limitUntil, delayUntil); @@ -719,7 +719,7 @@ will never work with more accuracy than 1ms. */ jobData?: JobJsonRaw, jobId?: string, token?: string, - ): Promise> { + ): Promise, ResultType, NameType>> { if (!jobData) { if (!this.drained) { this.emit('drained'); @@ -740,11 +740,11 @@ will never work with more accuracy than 1ms. */ } async processJob( - job: Job, + job: Job, ResultType, NameType>, token: string, fetchNextCallback = () => true, jobsInProgress: Set<{ job: Job; ts: number }>, - ): Promise> { + ): Promise, ResultType, NameType>> { if (!job || this.closing || this.paused) { return; } @@ -1051,10 +1051,10 @@ will never work with more accuracy than 1ms. */ stalled.forEach((jobId: string) => this.emit('stalled', jobId, 'active')); - const jobPromises: Promise>[] = []; + const jobPromises: Promise, ResultType, NameType>>[] = []; for (let i = 0; i < failed.length; i++) { jobPromises.push( - Job.fromId( + Job.fromId, ResultType, NameType>( this as MinimalQueue, failed[i], ), @@ -1069,8 +1069,8 @@ will never work with more accuracy than 1ms. */ this.notifyFailedJobs(await Promise.all(jobPromises)); } - private notifyFailedJobs(failedJobs: Job[]) { - failedJobs.forEach((job: Job) => + private notifyFailedJobs(failedJobs: Job, ResultType, NameType>[]) { + failedJobs.forEach((job: Job, ResultType, NameType>) => this.emit( 'failed', job, @@ -1081,7 +1081,7 @@ will never work with more accuracy than 1ms. */ } private moveLimitedBackToWait( - job: Job, + job: Job, ResultType, NameType>, token: string, ) { return this.scripts.moveJobFromActiveToWait(job.id, token); diff --git a/src/types/index.ts b/src/types/index.ts index 7d30742ac9..bb6b00e102 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -5,3 +5,4 @@ export * from './job-json-sandbox'; export * from './job-options'; export * from './job-type'; export * from './repeat-strategy'; +export * from './serialize'; \ No newline at end of file diff --git a/src/types/serialize.ts b/src/types/serialize.ts new file mode 100644 index 0000000000..d217f1f08a --- /dev/null +++ b/src/types/serialize.ts @@ -0,0 +1,146 @@ +/** + * The `Serialize` type emulates the operation `JSON.parse(JSON.stringify(x))` done + * when passing the `data` to a `Job` in a `Worker`. + */ +export type Serialize = + /** + * If the input is any then the output must as well be any + */ + IsAny extends true + ? any + : /** + * Under the hood, JSON.stringify calls the `toJSON()` method on his parameter. + */ + T extends { toJSON(): infer U } + ? U extends JsonValue + ? U + : unknown + : /** + * Primitives + */ + T extends JsonPrimitive + ? T + : /** + * Primitive wrappers + */ + T extends String + ? string + : T extends Number + ? number + : T extends Boolean + ? boolean + : /** + * JSON.stringify returns always `{}` for a `Promise` + */ + T extends Promise + ? EmptyObject + : /** + * Map + */ + T extends Map + ? EmptyObject + : /** + * Set + */ + T extends Set + ? EmptyObject + : /** + * Array views + */ + T extends TypedArray + ? Record + : /** + * Some object can't be serialized, so we remove them. + */ + T extends NotJson + ? never + : /** + * Arrays + */ + T extends [] + ? [] + : T extends readonly [infer F, ...infer R] + ? [NeverToNull>, ...Serialize] + : T extends readonly unknown[] + ? Array>> + : /** + * Objects + */ + T extends Record + ? Prettify> + : /** + * Unknown + */ + unknown extends T + ? unknown + : never; + +/** + * Some utils. + */ + +type NotJson = undefined | symbol | ((...args: any[]) => unknown); + +// value is always not JSON => true +// value is always JSON => false +// value is somtimes JSON, sometimes not JSON => boolean +// note: cannot be inlined as logic requires union distribution +type ValueIsNotJson = T extends NotJson ? true : false; + +// note: remove optionality so that produced values are never `undefined`, +// only `true`, `false`, or `boolean` +type IsNotJson = { [K in keyof T]-?: ValueIsNotJson }; + +type SerializeValues = { [K in keyof T]: Serialize }; + +type SerializeObject> = + // required + { + [K in keyof T as unknown extends T[K] + ? never + : IsNotJson[K] extends false + ? K + : never]: SerializeValues[K]; + } & { + // optional + [K in keyof T as unknown extends T[K] + ? K + : // if the value is always JSON, then it's not optional + IsNotJson[K] extends false + ? never + : // if the value is always not JSON, omit it entirely + IsNotJson[K] extends true + ? never + : // if the value is mixed, then it's optional + K]?: SerializeValues[K]; + }; + +type JsonPrimitive = string | number | boolean | null; + +type JsonArray = JsonValue[] | readonly JsonValue[]; + +type JsonObject = { [K in string]: JsonValue } & { [K in string]?: JsonValue }; + +type JsonValue = JsonPrimitive | JsonObject | JsonArray; + +type TypedArray = + | Int8Array + | Uint8Array + | Uint8ClampedArray + | Int16Array + | Uint16Array + | Int32Array + | Uint32Array + | Float32Array + | Float64Array + | BigInt64Array + | BigUint64Array; + +type Prettify = { [K in keyof T]: T[K] } & {}; + +type NeverToNull = [T] extends [never] ? null : T; + +declare const emptyObjectSymbol: unique symbol; +type EmptyObject = { [emptyObjectSymbol]?: never }; + +type IsAny = 0 extends 1 & T ? true : false;