Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 64 additions & 5 deletions source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ Promise queue with concurrency control.
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<EventName> { // eslint-disable-line @typescript-eslint/naming-convention, unicorn/prefer-event-target
readonly #carryoverConcurrencyCount: boolean;

readonly #isIntervalIgnored: boolean;

#intervalCount = 0;

readonly #intervalCap: number;
#intervalCap: number;

readonly #interval: number;
#interval: number;

#intervalEnd = 0;

Expand Down Expand Up @@ -77,7 +75,6 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
}

this.#carryoverConcurrencyCount = options.carryoverConcurrencyCount!;
this.#isIntervalIgnored = options.intervalCap === Number.POSITIVE_INFINITY || options.interval === 0;
this.#intervalCap = options.intervalCap;
this.#interval = options.interval;
this.#queue = new options.queueClass!();
Expand All @@ -88,6 +85,68 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
this.#isPaused = options.autoStart === false;
}

get #isIntervalIgnored(): boolean {
return this.#intervalCap === Number.POSITIVE_INFINITY || this.#interval === 0;
}

get interval(): number {
return this.#interval;
}

set interval(newInterval: number) {
if (!(Number.isFinite(newInterval) && newInterval >= 0)) {
throw new TypeError(`Expected \`interval\` to be a finite number >= 0, got \`${newInterval}\` (${typeof newInterval})`);
}

if (this.#interval === newInterval) {
return;
}

this.#interval = newInterval;

const now = Date.now();

const difference = newInterval - this.#interval;

this.#intervalEnd += difference;

if (this.isPaused) {
return;
}

clearInterval(this.#intervalId);

// If the interval would have already ended, immediately trigger it
if (this.#intervalEnd <= now) {
this.#onInterval();
this.#initializeIntervalIfNeeded();
} else {
// If the interval is still in the future, restart it to the correct delay
this.#timeoutId = setTimeout(() => {
this.#timeoutId = undefined;
this.#onInterval();
this.#initializeIntervalIfNeeded();
}, this.#intervalEnd - now);
}
}

get intervalCap(): number {
return this.#intervalCap;
}

set intervalCap(newIntervalCap: number) {
if (!(typeof newIntervalCap === 'number' && newIntervalCap >= 1)) {
throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${newIntervalCap?.toString() ?? ''}\` (${typeof newIntervalCap})`);
}

if (this.#intervalCap === newIntervalCap) {
return;
}

this.#intervalCap = newIntervalCap;
this.#tryToStartAnother();
}

get #doesIntervalAllowAnother(): boolean {
return this.#isIntervalIgnored || this.#intervalCount < this.#intervalCap;
}
Expand Down