Skip to content

Commit c760525

Browse files
committed
refactor: add setGlobalConcurrency method
1 parent 4c2527e commit c760525

File tree

3 files changed

+25
-21
lines changed

3 files changed

+25
-21
lines changed

src/classes/queue.ts

+12-3
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,6 @@ export class Queue<
167167
get metaValues(): Record<string, string | number> {
168168
return {
169169
'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000,
170-
...(this.opts?.concurrency
171-
? { concurrency: this.opts?.concurrency }
172-
: {}),
173170
};
174171
}
175172

@@ -186,6 +183,18 @@ export class Queue<
186183
});
187184
}
188185

186+
/**
187+
* Enable and set global concurrency value.
188+
* @param concurrency - Maximum number of simultaneous jobs that the workers can handle.
189+
* For instance, setting this value to 1 ensures that no more than one job
190+
* is processed at any given time. If this limit is not defined, there will be no
191+
* restriction on the number of concurrent jobs.
192+
*/
193+
async setGlobalConcurrency(concurrency: number) {
194+
const client = await this.client;
195+
return client.hset(this.keys.meta, 'concurrency', concurrency);
196+
}
197+
189198
/**
190199
* Adds a new job to the queue.
191200
*

src/interfaces/queue-options.ts

-8
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,6 @@ export interface QueueOptions extends QueueBaseOptions {
5454
};
5555
};
5656

57-
/**
58-
* Maximum number of simultaneous jobs that the workers can handle.
59-
* For instance, setting this value to 1 ensures that no more than one job
60-
* is processed at any given time. If this limit is not defined, there will be no
61-
* restriction on the number of concurrent jobs.
62-
*/
63-
concurrency?: number;
64-
6557
settings?: AdvancedRepeatOptions;
6658
}
6759

tests/test_concurrency.ts

+13-10
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ describe('Concurrency', () => {
3131
});
3232

3333
it('should run max concurrency for jobs added', async () => {
34-
const queue = new Queue(queueName, { connection, concurrency: 1, prefix });
34+
const queue = new Queue(queueName, { connection, prefix });
3535
const numJobs = 15;
3636
const jobsData: { name: string; data: any }[] = [];
3737
for (let j = 0; j < numJobs; j++) {
@@ -42,7 +42,7 @@ describe('Concurrency', () => {
4242
}
4343

4444
await queue.addBulk(jobsData);
45-
45+
await queue.setGlobalConcurrency(1);
4646
const bar = new ProgressBar(':bar', { total: numJobs });
4747

4848
let count = 0;
@@ -93,7 +93,7 @@ describe('Concurrency', () => {
9393
}).timeout(16000);
9494

9595
it('emits drained global event only once when worker is idle', async function () {
96-
const queue = new Queue(queueName, { connection, concurrency: 1, prefix });
96+
const queue = new Queue(queueName, { connection, prefix });
9797
const worker = new Worker(
9898
queueName,
9999
async () => {
@@ -119,6 +119,7 @@ describe('Concurrency', () => {
119119
{ name: 'test', data: { foo: 'bar' } },
120120
{ name: 'test', data: { foo: 'baz' } },
121121
]);
122+
await queue.setGlobalConcurrency(1);
122123

123124
await delay(4000);
124125

@@ -138,11 +139,11 @@ describe('Concurrency', () => {
138139

139140
const queue = new Queue(queueName, {
140141
connection,
141-
concurrency: 1,
142142
prefix,
143143
});
144144
const queueEvents = new QueueEvents(queueName, { connection, prefix });
145145
await queueEvents.waitUntilReady();
146+
await queue.setGlobalConcurrency(1);
146147

147148
const worker = new Worker(
148149
queueName,
@@ -222,11 +223,11 @@ describe('Concurrency', () => {
222223

223224
const queue = new Queue(queueName, {
224225
connection,
225-
concurrency: 1,
226226
prefix,
227227
});
228228
const queueEvents = new QueueEvents(queueName, { connection, prefix });
229229
await queueEvents.waitUntilReady();
230+
await queue.setGlobalConcurrency(1);
230231

231232
const worker = new Worker(
232233
queueName,
@@ -304,7 +305,6 @@ describe('Concurrency', () => {
304305
const flow = new FlowProducer({ connection, prefix });
305306
const queue = new Queue(queueName, {
306307
connection,
307-
concurrency: 1,
308308
prefix,
309309
});
310310

@@ -317,6 +317,8 @@ describe('Concurrency', () => {
317317
}
318318

319319
await queue.addBulk(jobsData);
320+
await queue.setGlobalConcurrency(1);
321+
320322
const name = 'child-job';
321323

322324
await flow.add({
@@ -394,13 +396,13 @@ describe('Concurrency', () => {
394396
const globalConcurrency = 2;
395397
const queue = new Queue(queueName, {
396398
connection,
397-
concurrency: globalConcurrency,
398399
prefix,
399400
});
400401

401402
for (let j = 0; j < numJobs; j++) {
402403
await queue.add('test-stalled', { foo: j % 2 });
403404
}
405+
await queue.setGlobalConcurrency(globalConcurrency);
404406

405407
const concurrency = 4;
406408

@@ -486,7 +488,6 @@ describe('Concurrency', () => {
486488
const globalConcurrency = 1;
487489
const queue = new Queue(queueName, {
488490
connection,
489-
concurrency: globalConcurrency,
490491
prefix,
491492
});
492493

@@ -497,6 +498,7 @@ describe('Concurrency', () => {
497498
{ attempts: 2, backoff: 100 },
498499
);
499500
}
501+
await queue.setGlobalConcurrency(globalConcurrency);
500502

501503
const concurrency = 10;
502504

@@ -538,7 +540,6 @@ describe('Concurrency', () => {
538540
const globalConcurrency = 1;
539541
const queue = new Queue(queueName, {
540542
connection,
541-
concurrency: globalConcurrency,
542543
prefix,
543544
});
544545

@@ -549,6 +550,7 @@ describe('Concurrency', () => {
549550
{ attempts: 2, backoff: 0 },
550551
);
551552
}
553+
await queue.setGlobalConcurrency(globalConcurrency);
552554

553555
const concurrency = 4;
554556

@@ -591,9 +593,10 @@ describe('Concurrency', () => {
591593
const globalConcurrency = 1;
592594
const queue = new Queue(queueName, {
593595
connection,
594-
concurrency: globalConcurrency,
595596
prefix,
596597
});
598+
await queue.waitUntilReady();
599+
await queue.setGlobalConcurrency(globalConcurrency);
597600
const worker = new Worker(queueName, null, {
598601
connection,
599602
lockRenewTime: 200,

0 commit comments

Comments
 (0)