Skip to content

Commit 115e528

Browse files
committed
refactor: deserialize custom event object
1 parent 5e16f76 commit 115e528

File tree

2 files changed

+72
-6
lines changed

2 files changed

+72
-6
lines changed

src/classes/queue-events.ts

+26-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
array2obj,
99
clientCommandMessageReg,
1010
isRedisInstance,
11+
parseObjectValues,
1112
QUEUE_EVENT_SUFFIX,
1213
} from '../utils';
1314
import { QueueBase } from './queue-base';
@@ -298,20 +299,40 @@ export class QueueEvents extends QueueBase {
298299
id = events[i][0];
299300
const args = array2obj(events[i][1]);
300301

302+
let { event, ...restArgs } = args;
303+
301304
//
302305
// TODO: we may need to have a separate xtream for progress data
303306
// to avoid this hack.
304-
switch (args.event) {
307+
switch (event) {
308+
case 'active':
309+
case 'added':
310+
case 'cleaned':
311+
case 'debounced': // TODO: to be removed in next breaking change
312+
case 'deduplicated':
313+
case 'delayed':
314+
case 'duplicated':
315+
case 'error':
316+
case 'failed':
317+
case 'paused':
318+
case 'removed':
319+
case 'resumed':
320+
case 'retries-exhausted':
321+
case 'stalled':
322+
case 'waiting':
323+
case 'waiting-children':
324+
break;
305325
case 'progress':
306-
args.data = JSON.parse(args.data);
326+
restArgs.data = JSON.parse(restArgs.data);
307327
break;
308328
case 'completed':
309-
args.returnvalue = JSON.parse(args.returnvalue);
329+
restArgs.returnvalue = JSON.parse(restArgs.returnvalue);
330+
break;
331+
default:
332+
restArgs = parseObjectValues(restArgs);
310333
break;
311334
}
312335

313-
const { event, ...restArgs } = args;
314-
315336
if (event === 'drained') {
316337
this.emit(event, id);
317338
} else {

tests/test_events.ts

+46-1
Original file line numberDiff line numberDiff line change
@@ -1269,7 +1269,7 @@ describe('events', function () {
12691269
});
12701270

12711271
describe('when publishing custom events', function () {
1272-
it('emits waiting when a job has been added', async () => {
1272+
it('emits custom event', async () => {
12731273
const queueName2 = `test-${v4()}`;
12741274
const queueEventsProducer = new QueueEventsProducer(queueName2, {
12751275
connection,
@@ -1311,5 +1311,50 @@ describe('events', function () {
13111311
await queueEvents2.close();
13121312
await removeAllQueueData(new IORedis(redisHost), queueName2);
13131313
});
1314+
1315+
describe('when published event is an object', function () {
1316+
it('deserialize event', async () => {
1317+
const queueName2 = `test-${v4()}`;
1318+
const queueEventsProducer = new QueueEventsProducer(queueName2, {
1319+
connection,
1320+
prefix,
1321+
});
1322+
const queueEvents2 = new QueueEvents(queueName2, {
1323+
autorun: false,
1324+
connection,
1325+
prefix,
1326+
lastEventId: '0-0',
1327+
});
1328+
await queueEvents2.waitUntilReady();
1329+
1330+
interface CustomListener extends QueueEventsListener {
1331+
example: (args: { custom: { foo: string } }, id: string) => void;
1332+
}
1333+
const customEvent = new Promise<void>(resolve => {
1334+
queueEvents2.on<CustomListener>('example', async ({ custom }) => {
1335+
await delay(250);
1336+
await expect(custom.foo).to.be.equal('value');
1337+
resolve();
1338+
});
1339+
});
1340+
1341+
interface CustomEventPayload {
1342+
eventName: string;
1343+
custom: { foo: string };
1344+
}
1345+
1346+
await queueEventsProducer.publishEvent<CustomEventPayload>({
1347+
eventName: 'example',
1348+
custom: { foo: 'value' },
1349+
});
1350+
1351+
queueEvents2.run();
1352+
await customEvent;
1353+
1354+
await queueEventsProducer.close();
1355+
await queueEvents2.close();
1356+
await removeAllQueueData(new IORedis(redisHost), queueName2);
1357+
});
1358+
});
13141359
});
13151360
});

0 commit comments

Comments
 (0)