From 4b751a84f8b8422aed5c46d3baf23f6b0e94f678 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=C2=A0F=2E=20Romaniello?= Date: Tue, 4 Sep 2018 15:16:41 -0300 Subject: [PATCH] verify the replyTo queue still exists before running the command Add a parameter to verify if the `replyTo` queue still exists before running the command. --- src/AMQPEndpoint.js | 3 ++- src/AMQPRPCClient.js | 3 +++ src/AMQPRPCServer.js | 38 ++++++++++++++++++++++++++++++++ test/integration/AMQPRPC.test.js | 20 +++++++++++++++++ 4 files changed, 63 insertions(+), 1 deletion(-) diff --git a/src/AMQPEndpoint.js b/src/AMQPEndpoint.js index 7d6fc86..51c2182 100644 --- a/src/AMQPEndpoint.js +++ b/src/AMQPEndpoint.js @@ -42,8 +42,9 @@ class AMQPEndpoint { */ async disconnect() { if (!this._channel) return; - await this._channel.close(); + const channel = this._channel; this._channel = null; + await channel.close(); } } diff --git a/src/AMQPRPCClient.js b/src/AMQPRPCClient.js index 9f36648..102e303 100644 --- a/src/AMQPRPCClient.js +++ b/src/AMQPRPCClient.js @@ -106,6 +106,9 @@ class AMQPRPCClient extends AMQPEndpoint { * @returns {Promise} */ async disconnect() { + if (!this._channel) { + return; + } await this._channel.cancel(this._consumerTag); if (this._params.repliesQueue === '') { diff --git a/src/AMQPRPCServer.js b/src/AMQPRPCServer.js index 489968d..99432a8 100644 --- a/src/AMQPRPCServer.js +++ b/src/AMQPRPCServer.js @@ -16,6 +16,7 @@ class AMQPRPCServer extends AMQPEndpoint { * @param {Object} params * @param {String} params.requestsQueue queue when AMQPRPC client sends commands, should correspond with AMQPRPCClient * default is '' which means auto-generated queue name + * @param {Boolean} params.verifyReplyQueue verify that the reply queue exists before handling a request */ constructor(connection, params = {}) { params.requestsQueue = params.requestsQueue || ''; @@ -24,6 +25,7 @@ class AMQPRPCServer extends AMQPEndpoint { this._requestsQueue = params.requestsQueue; this._commands = {}; + this._verifyReplyQueue = params.verifyReplyQueue; } /** @@ -74,6 +76,33 @@ class AMQPRPCServer extends AMQPEndpoint { return this; } + /** + * @private + * + * this is an internal method used to verify if the + * reply queue still exists by using a disposable channel. + * + * Extracted from rabbitmq documentation: https://www.rabbitmq.com/direct-reply-to.html + * + * If the RPC server is going to perform some expensive computation + * it might wish to check if the client has gone away. To do this the + * server can declare the generated reply name first on a disposable + * channel in order to determine whether it still exists. + */ + async _checkQueue(queueName) { + const channel = await this._connection.createChannel(); + let result = true; + try { + //we need to handle this error otherwise it breaks the application + channel.on('error', () => { result = false; }); + await channel.checkQueue(queueName); + } catch(err) { + result = false; + } + await channel.close(); + return result; + } + /** * * @private @@ -86,6 +115,15 @@ class AMQPRPCServer extends AMQPEndpoint { const persistent = msg.properties.deliveryMode !== 1; try { + if (this._verifyReplyQueue) { + const replyQueueExists = await this._checkQueue(replyTo); + if (!replyQueueExists) { + //This means that the client has disposed the reply queue + //before we finished. + return; + } + } + const result = await this._dispatchCommand(msg); const content = new CommandResult(CommandResult.STATES.SUCCESS, result).pack(); diff --git a/test/integration/AMQPRPC.test.js b/test/integration/AMQPRPC.test.js index 05d0a85..1695060 100644 --- a/test/integration/AMQPRPC.test.js +++ b/test/integration/AMQPRPC.test.js @@ -56,6 +56,26 @@ describe('AMQPRPCClient to AMQPRPCServer', () => { expect(commandStub).to.have.callCount(1); expect(commandStub.getCall(0).args).to.deep.equal([]); }); + it('should not invoke the command if client disconnects', async () => { + server._verifyReplyQueue = true; + const commandStub = sinon.stub(); + server.addCommand('command', commandStub); + + //disconnect the client when the req reaches + //the server but before dispatching to the handler + server._handleMsg = (function(original) { + return async function(...args) { + await client.disconnect(); + return await original.apply(server, args); + }; + })(server._handleMsg); + + await expect(client.sendCommand('command')) + .to.be.rejectedWith(Error) + .and.eventually.have.property('message', 'sendCommand canceled due to client disconnect, command:command, correlationId:0'); + + expect(commandStub).to.have.callCount(0); + }); it('should delete timedout requests from map', async () => { server.addCommand('command', () => new Promise(sinon.stub())); await expect(client.sendCommand('command')).to.be.rejectedWith(Error);