From 4090f4cb4fef00db8bf6c2621f02a264210822a7 Mon Sep 17 00:00:00 2001 From: Jesse Carter Date: Mon, 7 Dec 2020 18:42:36 -0500 Subject: [PATCH] test(rabbitmq): additional integration test coverage ensure that subscribers with similar routing key patterns only receive messages intended for them re #202 --- .../rabbitmq/e2e/configuration.e2e-spec.ts | 5 +- .../e2e/rpc-no-direct-reply.e2e-spec.ts | 8 +- .../e2e/subscribe-no-handlers.e2e-spec.ts | 12 ++- .../rabbitmq/e2e/subscribe.e2e-spec.ts | 77 ++++++++++++++++++- packages/rabbitmq/src/rabbitmq.module.ts | 1 - 5 files changed, 93 insertions(+), 10 deletions(-) diff --git a/integration/rabbitmq/e2e/configuration.e2e-spec.ts b/integration/rabbitmq/e2e/configuration.e2e-spec.ts index 76d3018dc..9e5d5fb6a 100644 --- a/integration/rabbitmq/e2e/configuration.e2e-spec.ts +++ b/integration/rabbitmq/e2e/configuration.e2e-spec.ts @@ -19,7 +19,10 @@ class RabbitConfig { describe('Module Configuration', () => { let app: TestingModule; - afterEach(() => jest.clearAllMocks()); + afterEach(async () => { + jest.clearAllMocks(); + await app.close(); + }); describe('forRoot', () => { it('should configure RabbitMQ', async () => { diff --git a/integration/rabbitmq/e2e/rpc-no-direct-reply.e2e-spec.ts b/integration/rabbitmq/e2e/rpc-no-direct-reply.e2e-spec.ts index 68aed3561..4481ade02 100644 --- a/integration/rabbitmq/e2e/rpc-no-direct-reply.e2e-spec.ts +++ b/integration/rabbitmq/e2e/rpc-no-direct-reply.e2e-spec.ts @@ -32,7 +32,7 @@ describe('Rabbit Direct Reply To', () => { const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost'; const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`; - beforeEach(async () => { + beforeAll(async () => { const moduleFixture = await Test.createTestingModule({ providers: [RpcService], imports: [ @@ -55,7 +55,11 @@ describe('Rabbit Direct Reply To', () => { await app.init(); }); - it('should not receive subscribe messages because register handlers is disabled', async done => { + afterAll(async () => { + await app.close(); + }); + + it('should not receive subscribe messages because register handlers is disabled', async (done) => { await expect( amqpConnection.request({ exchange, diff --git a/integration/rabbitmq/e2e/subscribe-no-handlers.e2e-spec.ts b/integration/rabbitmq/e2e/subscribe-no-handlers.e2e-spec.ts index 9e8567760..9317bd2d3 100644 --- a/integration/rabbitmq/e2e/subscribe-no-handlers.e2e-spec.ts +++ b/integration/rabbitmq/e2e/subscribe-no-handlers.e2e-spec.ts @@ -31,7 +31,7 @@ describe('Rabbit Subscribe Without Register Handlers', () => { const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost'; const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`; - beforeEach(async () => { + beforeAll(async () => { const moduleFixture = await Test.createTestingModule({ providers: [SubscribeService], imports: [ @@ -54,14 +54,20 @@ describe('Rabbit Subscribe Without Register Handlers', () => { await app.init(); }); - it('should not receive subscribe messages because register handlers is disabled', async done => { + afterAll(async () => { + await app.close(); + }); + + it('should not receive subscribe messages because register handlers is disabled', async (done) => { [routingKey1, routingKey2].forEach((x, i) => amqpConnection.publish(exchange, x, `testMessage-${i}`), ); + expect.assertions(1); + setTimeout(() => { expect(testHandler).not.toHaveBeenCalled(); done(); - }, 50); + }, 100); }); }); diff --git a/integration/rabbitmq/e2e/subscribe.e2e-spec.ts b/integration/rabbitmq/e2e/subscribe.e2e-spec.ts index 155529f83..d5f6e1041 100644 --- a/integration/rabbitmq/e2e/subscribe.e2e-spec.ts +++ b/integration/rabbitmq/e2e/subscribe.e2e-spec.ts @@ -5,6 +5,7 @@ import { } from '@golevelup/nestjs-rabbitmq'; import { INestApplication, Injectable } from '@nestjs/common'; import { Test } from '@nestjs/testing'; +import { flatten, times } from 'lodash'; const testHandler = jest.fn(); @@ -13,6 +14,14 @@ const routingKey1 = 'testSubscribeRoute1'; const routingKey2 = 'testSubscribeRoute2'; const nonJsonRoutingKey = 'nonJsonSubscribeRoute'; +const createRoutingKey = 'test.create.object'; +const updateRoutingKey = 'test.update.object'; +const deleteRoutingKey = 'test.delete.object'; + +const createHandler = jest.fn(); +const updateHandler = jest.fn(); +const deleteHandler = jest.fn(); + @Injectable() class SubscribeService { @RabbitSubscribe({ @@ -33,6 +42,36 @@ class SubscribeService { nonJsonHandleSubscribe(message: any) { testHandler(message); } + + @RabbitSubscribe({ + exchange, + routingKey: [createRoutingKey], + queue: 'create', + allowNonJsonMessages: true, + }) + create(message: any) { + createHandler(message); + } + + @RabbitSubscribe({ + exchange, + routingKey: [updateRoutingKey], + queue: 'update', + allowNonJsonMessages: true, + }) + update(message: any) { + updateHandler(message); + } + + @RabbitSubscribe({ + exchange, + routingKey: [deleteRoutingKey], + queue: 'delete', + allowNonJsonMessages: true, + }) + delete(message: any) { + deleteHandler(message); + } } describe('Rabbit Subscribe', () => { @@ -42,7 +81,7 @@ describe('Rabbit Subscribe', () => { const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost'; const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`; - beforeEach(async () => { + beforeAll(async () => { const moduleFixture = await Test.createTestingModule({ providers: [SubscribeService], imports: [ @@ -59,18 +98,26 @@ describe('Rabbit Subscribe', () => { ], }).compile(); - jest.resetAllMocks(); - app = moduleFixture.createNestApplication(); amqpConnection = app.get(AmqpConnection); await app.init(); }); + afterAll(async () => { + await app.close(); + }); + + beforeEach(() => { + jest.resetAllMocks(); + }); + it('should receive subscribe messages and handle them', async (done) => { [routingKey1, routingKey2, nonJsonRoutingKey].forEach((x, i) => amqpConnection.publish(exchange, x, `testMessage-${i}`), ); + expect.assertions(4); + setTimeout(() => { expect(testHandler).toHaveBeenCalledTimes(3); expect(testHandler).toHaveBeenCalledWith(`testMessage-0`); @@ -80,6 +127,30 @@ describe('Rabbit Subscribe', () => { }, 50); }); + it('should work with a topic exchange set up that has multiple subscribers with similar routing keys', async (done) => { + const routingKeys = [createRoutingKey, updateRoutingKey, deleteRoutingKey]; + + const promises = flatten( + routingKeys.map((key) => { + return times(100).map((x) => amqpConnection.publish(exchange, key, x)); + }), + ); + + await Promise.all(promises); + + expect.assertions(303); + + setTimeout(() => { + expect(createHandler).toHaveBeenCalledTimes(100); + times(100).forEach((x) => expect(createHandler).toHaveBeenCalledWith(x)); + expect(updateHandler).toHaveBeenCalledTimes(100); + times(100).forEach((x) => expect(updateHandler).toHaveBeenCalledWith(x)); + expect(deleteHandler).toHaveBeenCalledTimes(100); + times(100).forEach((x) => expect(deleteHandler).toHaveBeenCalledWith(x)); + done(); + }, 150); + }); + it('should receive undefined argument when subscriber allows non-json messages and message is invalid', async (done) => { amqpConnection.publish(exchange, nonJsonRoutingKey, undefined); amqpConnection.publish(exchange, nonJsonRoutingKey, Buffer.alloc(0)); diff --git a/packages/rabbitmq/src/rabbitmq.module.ts b/packages/rabbitmq/src/rabbitmq.module.ts index 0bed59357..9d77f6d20 100644 --- a/packages/rabbitmq/src/rabbitmq.module.ts +++ b/packages/rabbitmq/src/rabbitmq.module.ts @@ -7,7 +7,6 @@ import { DynamicModule, Logger, Module, - OnApplicationShutdown, OnModuleInit, OnModuleDestroy, } from '@nestjs/common';