From 85b1b67c50a357d6b0d7a5bfc3f1eb281418b391 Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Wed, 8 Apr 2020 08:44:43 +0200 Subject: [PATCH] feat(rabbitmq): added error callbacks in favor of error behaviors --- .../rabbitmq/src/rpc/reply.error.callback.ts | 20 ++++ integration/rabbitmq/src/rpc/rpc.service.ts | 4 +- packages/rabbitmq/src/amqp/connection.ts | 103 +++++++----------- packages/rabbitmq/src/rabbitmq.interfaces.ts | 13 ++- 4 files changed, 67 insertions(+), 73 deletions(-) create mode 100644 integration/rabbitmq/src/rpc/reply.error.callback.ts diff --git a/integration/rabbitmq/src/rpc/reply.error.callback.ts b/integration/rabbitmq/src/rpc/reply.error.callback.ts new file mode 100644 index 000000000..409b0b5d3 --- /dev/null +++ b/integration/rabbitmq/src/rpc/reply.error.callback.ts @@ -0,0 +1,20 @@ +import * as amqplib from 'amqplib'; + +export function ReplyErrorCallback( + channel: amqplib.Channel, + msg: amqplib.ConsumeMessage, + error: any, +) { + const { replyTo, correlationId } = msg.properties; + if (replyTo) { + if (error instanceof Error) { + error = error.message; + } else if (typeof error !== 'string') { + error = JSON.stringify(error); + } + + error = Buffer.from(JSON.stringify({ status: 'error', message: error })); + + channel.publish('', replyTo, error, { correlationId }); + } +} diff --git a/integration/rabbitmq/src/rpc/rpc.service.ts b/integration/rabbitmq/src/rpc/rpc.service.ts index 32ee723c1..46c4ec5ab 100644 --- a/integration/rabbitmq/src/rpc/rpc.service.ts +++ b/integration/rabbitmq/src/rpc/rpc.service.ts @@ -5,6 +5,7 @@ import { import { Injectable, UseInterceptors } from '@nestjs/common'; import { TransformInterceptor } from '../transform.interceptor'; import { RpcException } from '@nestjs/microservices'; +import { ReplyErrorCallback } from './reply.error.callback'; @Injectable() export class RpcService { @@ -35,7 +36,8 @@ export class RpcService { routingKey: 'error-reply-rpc', exchange: 'exchange1', queue: 'error-reply-rpc', - errorBehavior: MessageHandlerErrorBehavior.REPLYERRORANDACK, + errorBehavior: MessageHandlerErrorBehavior.ACK, + errorCallbacks: [ReplyErrorCallback], }) errorReplyRpc(message: object) { throw new RpcException(message); diff --git a/packages/rabbitmq/src/amqp/connection.ts b/packages/rabbitmq/src/amqp/connection.ts index 0528f58ae..b0aaf43fc 100644 --- a/packages/rabbitmq/src/amqp/connection.ts +++ b/packages/rabbitmq/src/amqp/connection.ts @@ -277,18 +277,8 @@ export class AmqpConnection { const errorBehavior = msgOptions.errorBehavior || this.config.defaultSubscribeErrorBehavior; - switch (errorBehavior) { - case MessageHandlerErrorBehavior.ACK: { - channel.ack(msg); - break; - } - case MessageHandlerErrorBehavior.REQUEUE: { - channel.nack(msg, false, true); - break; - } - default: - channel.nack(msg, false, false); - } + + await this.handleError(channel, msgOptions, errorBehavior, msg, e); } } }); @@ -355,22 +345,8 @@ export class AmqpConnection { } else { const errorBehavior = rpcOptions.errorBehavior || this.config.defaultRpcErrorBehavior; - switch (errorBehavior) { - case MessageHandlerErrorBehavior.ACK: { - channel.ack(msg); - break; - } - case MessageHandlerErrorBehavior.REQUEUE: { - channel.nack(msg, false, true); - break; - } - case MessageHandlerErrorBehavior.REPLYERRORANDACK: { - this.handleReplyAndAckError(channel, msg, e); - break; - } - default: - channel.nack(msg, false, false); - } + + await this.handleError(channel, rpcOptions, errorBehavior, msg, e); } } }); @@ -401,44 +377,6 @@ export class AmqpConnection { this._channel.publish(exchange, routingKey, buffer, options); } - private handleReplyAndAckError( - channel: amqplib.Channel, - msg: amqplib.ConsumeMessage, - error: any - ) { - try { - const { replyTo, correlationId } = msg.properties; - if (replyTo) { - this.publishError('', replyTo, error, { correlationId }); - channel.ack(msg); - } else { - channel.nack(msg, false, false); - } - } catch { - channel.nack(msg, false, true); - } - } - - private publishError( - exchange: string, - routingKey: string, - error: any, - options?: amqplib.Options.Publish - ) { - if (error instanceof Error) { - error = error.message; - } else if (typeof error !== 'string') { - error = JSON.stringify(error); - } - - this.publish( - exchange, - routingKey, - { status: 'error', message: error }, - options - ); - } - private handleMessage( handler: ( msg: T | undefined, @@ -463,4 +401,37 @@ export class AmqpConnection { return handler(message, msg); } + + private async handleError( + channel: amqplib.Channel, + msgOptions: MessageHandlerOptions, + errorBehavior: MessageHandlerErrorBehavior, + msg: amqplib.Message, + error: any + ) { + if (msg == null) { + return; + } else { + try { + if (msgOptions.errorCallbacks) { + for (const callback of msgOptions.errorCallbacks) { + await callback(channel, msg, error); + } + } + } finally { + switch (errorBehavior) { + case MessageHandlerErrorBehavior.ACK: { + channel.ack(msg); + break; + } + case MessageHandlerErrorBehavior.REQUEUE: { + channel.nack(msg, false, true); + break; + } + default: + channel.nack(msg, false, false); + } + } + } + } } diff --git a/packages/rabbitmq/src/rabbitmq.interfaces.ts b/packages/rabbitmq/src/rabbitmq.interfaces.ts index 14a20d5ce..bcc89565b 100644 --- a/packages/rabbitmq/src/rabbitmq.interfaces.ts +++ b/packages/rabbitmq/src/rabbitmq.interfaces.ts @@ -37,12 +37,6 @@ export enum MessageHandlerErrorBehavior { ACK, NACK, REQUEUE, - /** - * If an exception occurs while handling the message, the error will be serialized and published on the `replyTo` queue. - * If `replyTo` is not provided, the message will be NACKed without requeueing. - * If publish fails, message will be NACKed and requeued. - */ - REPLYERRORANDACK, } export interface MessageHandlerOptions { @@ -51,9 +45,16 @@ export interface MessageHandlerOptions { queue?: string; queueOptions?: QueueOptions; errorBehavior?: MessageHandlerErrorBehavior; + errorCallbacks?: IMessageErrorCallback[]; allowNonJsonMessages?: boolean; } +export type IMessageErrorCallback = ( + channel: amqplib.Channel, + msg: amqplib.ConsumeMessage, + error: any +) => Promise | any; + export interface ConnectionInitOptions { wait?: boolean; timeout?: number;