Skip to content

Commit

Permalink
feat(rabbitmq): integration tests, added option for non-json messages
Browse files Browse the repository at this point in the history
  • Loading branch information
perf2711 authored and WonderPanda committed Apr 16, 2020
1 parent e438a2a commit bc71ffa
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 29 deletions.
45 changes: 45 additions & 0 deletions integration/rabbitmq/e2e/rpc.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { INestApplication } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { AppModule } from '../src/app.module';

const nonJsonRpcRoutingKey = 'non-json-rpc';

describe('Rabbit RPC', () => {
let app: INestApplication;
let amqpConnection: AmqpConnection;
Expand Down Expand Up @@ -45,4 +47,47 @@ describe('Rabbit RPC', () => {

expect(response).toEqual({ transformed: { message: 42 } });
});

it('error reply RPC handler with non-JSON message should return an RPC error response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange1',
routingKey: 'error-reply-rpc',
payload: Buffer.from('{a:'),
});

expect(response).toHaveProperty('message');
expect(response).toMatchObject({ status: 'error' });
});

it('non-JSON RPC handler with should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange1',
routingKey: nonJsonRpcRoutingKey,
payload: {
request: 'val',
},
});

expect(response).toEqual({ echo: { request: 'val' } });
});

it('non-JSON RPC handler with undefined message should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange1',
routingKey: nonJsonRpcRoutingKey,
payload: Buffer.alloc(0),
});

expect(response).toEqual({ echo: undefined });
});

it('non-JSON RPC handler with unparsable message should receive a valid RPC response', async () => {
const response = await amqpConnection.request({
exchange: 'exchange1',
routingKey: nonJsonRpcRoutingKey,
payload: Buffer.from('{a:'),
});

expect(response).toEqual({ echo: undefined });
});
});
52 changes: 49 additions & 3 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const testHandler = jest.fn();
const exchange = 'testSubscribeExhange';
const routingKey1 = 'testSubscribeRoute1';
const routingKey2 = 'testSubscribeRoute2';
const nonJsonRoutingKey = 'nonJsonSubscribeRoute';

@Injectable()
class SubscribeService {
Expand All @@ -22,6 +23,16 @@ class SubscribeService {
handleSubscribe(message: object) {
testHandler(message);
}

@RabbitSubscribe({
exchange,
routingKey: [nonJsonRoutingKey],
queue: 'subscribeQueue',
allowNonJsonMessages: true,
})
nonJsonHandleSubscribe(message: any) {
testHandler(message);
}
}

describe('Rabbit Subscribe', () => {
Expand All @@ -48,21 +59,56 @@ describe('Rabbit Subscribe', () => {
],
}).compile();

jest.resetAllMocks();

app = moduleFixture.createNestApplication();
amqpConnection = app.get<AmqpConnection>(AmqpConnection);
await app.init();
});

it('should receive subscribe messages and handle them', async done => {
[routingKey1, routingKey2].forEach((x, i) =>
it('should receive subscribe messages and handle them', async (done) => {
[routingKey1, routingKey2, nonJsonRoutingKey].forEach((x, i) =>
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);

setTimeout(() => {
expect(testHandler).toHaveBeenCalledTimes(2);
expect(testHandler).toHaveBeenCalledTimes(3);
expect(testHandler).toHaveBeenCalledWith(`testMessage-0`);
expect(testHandler).toHaveBeenCalledWith(`testMessage-1`);
expect(testHandler).toHaveBeenCalledWith(`testMessage-2`);
done();
}, 50);
});

it('should receive undefined argument when subscriber allows non-json messages and message is invalid', async (done) => {
amqpConnection.publish(exchange, nonJsonRoutingKey, undefined);
amqpConnection.publish(exchange, nonJsonRoutingKey, Buffer.alloc(0));
amqpConnection.publish(exchange, nonJsonRoutingKey, Buffer.from('{a:'));

setTimeout(() => {
expect(testHandler).toHaveBeenCalledTimes(3);
expect(testHandler).toHaveBeenNthCalledWith(1, undefined);
expect(testHandler).toHaveBeenNthCalledWith(2, undefined);
expect(testHandler).toHaveBeenNthCalledWith(3, undefined);
done();
}, 50);
});

// it('should receive undefined argument when subscriber allows non-json messages and message is empty', async (done) => {

// setTimeout(() => {
// expect(testHandler).toHaveBeenCalledTimes(1);
// expect(testHandler).toHaveBeenCalledWith(undefined);
// done();
// }, 50);
// });

// it('should receive undefined argument when subscriber allows non-json messages and message is unparseable by JSON', async (done) => {

// setTimeout(() => {
// expect(testHandler).toHaveBeenCalledTimes(1);
// expect(testHandler).toHaveBeenCalledWith(undefined);
// done();
// }, 50);
// });
});
28 changes: 27 additions & 1 deletion integration/rabbitmq/src/rpc/rpc.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { RabbitRPC } from '@golevelup/nestjs-rabbitmq';
import {
RabbitRPC,
MessageHandlerErrorBehavior,
} from '@golevelup/nestjs-rabbitmq';
import { Injectable, UseInterceptors } from '@nestjs/common';
import { TransformInterceptor } from '../transform.interceptor';
import { RpcException } from '@nestjs/microservices';

@Injectable()
export class RpcService {
Expand All @@ -26,4 +30,26 @@ export class RpcService {
message: 42,
};
}

@RabbitRPC({
routingKey: 'error-reply-rpc',
exchange: 'exchange1',
queue: 'error-reply-rpc',
errorBehavior: MessageHandlerErrorBehavior.REPLYERRORANDACK,
})
errorReplyRpc(message: object) {
throw new RpcException(message);
}

@RabbitRPC({
routingKey: 'non-json-rpc',
exchange: 'exchange1',
queue: 'non-json-rpc',
allowNonJsonMessages: true,
})
nonJsonRpc(nonJsonMessage: any) {
return {
echo: nonJsonMessage,
};
}
}
71 changes: 46 additions & 25 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ export class AmqpConnection {
msgOptions: MessageHandlerOptions,
channel: amqplib.ConfirmChannel
): Promise<void> {
const { exchange, routingKey } = msgOptions;
const { exchange, routingKey, allowNonJsonMessages } = msgOptions;

const { queue } = await channel.assertQueue(
msgOptions.queue || '',
Expand All @@ -252,7 +252,12 @@ export class AmqpConnection {
throw new Error('Received null message');
}

const response = await this.handleMessage(handler, msg);
const response = await this.handleMessage(
handler,
msg,
allowNonJsonMessages
);

if (response instanceof Nack) {
channel.nack(msg, false, response.requeue);
return;
Expand Down Expand Up @@ -281,10 +286,6 @@ export class AmqpConnection {
channel.nack(msg, false, true);
break;
}
case MessageHandlerErrorBehavior.REPLYERRORANDACK: {
this.handleReplyAndAckError(channel, msg, e);
break;
}
default:
channel.nack(msg, false, false);
}
Expand Down Expand Up @@ -313,7 +314,7 @@ export class AmqpConnection {
rpcOptions: MessageHandlerOptions,
channel: amqplib.ConfirmChannel
) {
const { exchange, routingKey } = rpcOptions;
const { exchange, routingKey, allowNonJsonMessages } = rpcOptions;

const { queue } = await channel.assertQueue(
rpcOptions.queue || '',
Expand All @@ -332,7 +333,12 @@ export class AmqpConnection {
throw new Error('Received null message');
}

const response = await this.handleMessage(handler, msg);
const response = await this.handleMessage(
handler,
msg,
allowNonJsonMessages
);

if (response instanceof Nack) {
channel.nack(msg, false, response.requeue);
return;
Expand Down Expand Up @@ -381,12 +387,18 @@ export class AmqpConnection {
throw new Error('AMQP connection is not available');
}

this._channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
options
);
let buffer: Buffer;
if (message instanceof Buffer) {
buffer = message;
} else if (message instanceof Uint8Array) {
buffer = Buffer.from(message);
} else if (message != null) {
buffer = Buffer.from(JSON.stringify(message));
} else {
buffer = Buffer.alloc(0);
}

this._channel.publish(exchange, routingKey, buffer, options);
}

private handleReplyAndAckError(
Expand All @@ -398,6 +410,7 @@ export class AmqpConnection {
const { replyTo, correlationId } = msg.properties;
if (replyTo) {
this.publishError('', replyTo, error, { correlationId });
channel.ack(msg);
} else {
channel.nack(msg, false, false);
}
Expand All @@ -413,30 +426,38 @@ export class AmqpConnection {
options?: amqplib.Options.Publish
) {
if (error instanceof Error) {
error = {
name: error.name,
message: error.message,
stack: error.stack,
};
error = error.message;
} else if (typeof error !== 'string') {
error = JSON.stringify(error);
}

this.publish(exchange, routingKey, error, options);
this.publish(
exchange,
routingKey,
{ status: 'error', message: error },
options
);
}

private handleMessage<T, U>(
handler: (
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
) => Promise<U>,
msg: amqplib.ConsumeMessage
msg: amqplib.ConsumeMessage,
allowNonJsonMessages?: boolean
) {
let message: T | undefined = undefined;
if (msg.content) {
try {
if (allowNonJsonMessages) {
try {
message = JSON.parse(msg.content.toString()) as T;
} catch {
// Let handler handle parsing error, it has the raw message anyway
message = undefined;
}
} else {
message = JSON.parse(msg.content.toString()) as T;
} catch {
// Let handler handle parsing error, it has the raw message anyway
message = undefined;
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export interface MessageHandlerOptions {
queue?: string;
queueOptions?: QueueOptions;
errorBehavior?: MessageHandlerErrorBehavior;
allowNonJsonMessages?: boolean;
}

export interface ConnectionInitOptions {
Expand Down

0 comments on commit bc71ffa

Please sign in to comment.