Skip to content

Commit

Permalink
Merge pull request golevelup#205 from golevelup/chore/tests-to-suppor…
Browse files Browse the repository at this point in the history
…t-204

test(rabbitmq): additional integration test coverage
  • Loading branch information
WonderPanda authored Dec 7, 2020
2 parents af05b72 + 4090f4c commit 5c3e095
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 10 deletions.
5 changes: 4 additions & 1 deletion integration/rabbitmq/e2e/configuration.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ class RabbitConfig {
describe('Module Configuration', () => {
let app: TestingModule;

afterEach(() => jest.clearAllMocks());
afterEach(async () => {
jest.clearAllMocks();
await app.close();
});

describe('forRoot', () => {
it('should configure RabbitMQ', async () => {
Expand Down
8 changes: 6 additions & 2 deletions integration/rabbitmq/e2e/rpc-no-direct-reply.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('Rabbit Direct Reply To', () => {
const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`;

beforeEach(async () => {
beforeAll(async () => {
const moduleFixture = await Test.createTestingModule({
providers: [RpcService],
imports: [
Expand All @@ -55,7 +55,11 @@ describe('Rabbit Direct Reply To', () => {
await app.init();
});

it('should not receive subscribe messages because register handlers is disabled', async done => {
afterAll(async () => {
await app.close();
});

it('should not receive subscribe messages because register handlers is disabled', async (done) => {
await expect(
amqpConnection.request({
exchange,
Expand Down
12 changes: 9 additions & 3 deletions integration/rabbitmq/e2e/subscribe-no-handlers.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe('Rabbit Subscribe Without Register Handlers', () => {
const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`;

beforeEach(async () => {
beforeAll(async () => {
const moduleFixture = await Test.createTestingModule({
providers: [SubscribeService],
imports: [
Expand All @@ -54,14 +54,20 @@ describe('Rabbit Subscribe Without Register Handlers', () => {
await app.init();
});

it('should not receive subscribe messages because register handlers is disabled', async done => {
afterAll(async () => {
await app.close();
});

it('should not receive subscribe messages because register handlers is disabled', async (done) => {
[routingKey1, routingKey2].forEach((x, i) =>
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);

expect.assertions(1);

setTimeout(() => {
expect(testHandler).not.toHaveBeenCalled();
done();
}, 50);
}, 100);
});
});
77 changes: 74 additions & 3 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from '@golevelup/nestjs-rabbitmq';
import { INestApplication, Injectable } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { flatten, times } from 'lodash';

const testHandler = jest.fn();

Expand All @@ -13,6 +14,14 @@ const routingKey1 = 'testSubscribeRoute1';
const routingKey2 = 'testSubscribeRoute2';
const nonJsonRoutingKey = 'nonJsonSubscribeRoute';

const createRoutingKey = 'test.create.object';
const updateRoutingKey = 'test.update.object';
const deleteRoutingKey = 'test.delete.object';

const createHandler = jest.fn();
const updateHandler = jest.fn();
const deleteHandler = jest.fn();

@Injectable()
class SubscribeService {
@RabbitSubscribe({
Expand All @@ -33,6 +42,36 @@ class SubscribeService {
nonJsonHandleSubscribe(message: any) {
testHandler(message);
}

@RabbitSubscribe({
exchange,
routingKey: [createRoutingKey],
queue: 'create',
allowNonJsonMessages: true,
})
create(message: any) {
createHandler(message);
}

@RabbitSubscribe({
exchange,
routingKey: [updateRoutingKey],
queue: 'update',
allowNonJsonMessages: true,
})
update(message: any) {
updateHandler(message);
}

@RabbitSubscribe({
exchange,
routingKey: [deleteRoutingKey],
queue: 'delete',
allowNonJsonMessages: true,
})
delete(message: any) {
deleteHandler(message);
}
}

describe('Rabbit Subscribe', () => {
Expand All @@ -42,7 +81,7 @@ describe('Rabbit Subscribe', () => {
const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`;

beforeEach(async () => {
beforeAll(async () => {
const moduleFixture = await Test.createTestingModule({
providers: [SubscribeService],
imports: [
Expand All @@ -59,18 +98,26 @@ describe('Rabbit Subscribe', () => {
],
}).compile();

jest.resetAllMocks();

app = moduleFixture.createNestApplication();
amqpConnection = app.get<AmqpConnection>(AmqpConnection);
await app.init();
});

afterAll(async () => {
await app.close();
});

beforeEach(() => {
jest.resetAllMocks();
});

it('should receive subscribe messages and handle them', async (done) => {
[routingKey1, routingKey2, nonJsonRoutingKey].forEach((x, i) =>
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);

expect.assertions(4);

setTimeout(() => {
expect(testHandler).toHaveBeenCalledTimes(3);
expect(testHandler).toHaveBeenCalledWith(`testMessage-0`);
Expand All @@ -80,6 +127,30 @@ describe('Rabbit Subscribe', () => {
}, 50);
});

it('should work with a topic exchange set up that has multiple subscribers with similar routing keys', async (done) => {
const routingKeys = [createRoutingKey, updateRoutingKey, deleteRoutingKey];

const promises = flatten(
routingKeys.map((key) => {
return times(100).map((x) => amqpConnection.publish(exchange, key, x));
}),
);

await Promise.all(promises);

expect.assertions(303);

setTimeout(() => {
expect(createHandler).toHaveBeenCalledTimes(100);
times(100).forEach((x) => expect(createHandler).toHaveBeenCalledWith(x));
expect(updateHandler).toHaveBeenCalledTimes(100);
times(100).forEach((x) => expect(updateHandler).toHaveBeenCalledWith(x));
expect(deleteHandler).toHaveBeenCalledTimes(100);
times(100).forEach((x) => expect(deleteHandler).toHaveBeenCalledWith(x));
done();
}, 150);
});

it('should receive undefined argument when subscriber allows non-json messages and message is invalid', async (done) => {
amqpConnection.publish(exchange, nonJsonRoutingKey, undefined);
amqpConnection.publish(exchange, nonJsonRoutingKey, Buffer.alloc(0));
Expand Down
1 change: 0 additions & 1 deletion packages/rabbitmq/src/rabbitmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
DynamicModule,
Logger,
Module,
OnApplicationShutdown,
OnModuleInit,
OnModuleDestroy,
} from '@nestjs/common';
Expand Down

0 comments on commit 5c3e095

Please sign in to comment.