diff --git a/src/AMQPRPCServer.js b/src/AMQPRPCServer.js index 489968d..4e3a3a0 100644 --- a/src/AMQPRPCServer.js +++ b/src/AMQPRPCServer.js @@ -16,6 +16,8 @@ class AMQPRPCServer extends AMQPEndpoint { * @param {Object} params * @param {String} params.requestsQueue queue when AMQPRPC client sends commands, should correspond with AMQPRPCClient * default is '' which means auto-generated queue name + * @param {String} params.prefetchCount specifies the number of commands that should be run in parallel + * default is 0 which means unlimited */ constructor(connection, params = {}) { params.requestsQueue = params.requestsQueue || ''; @@ -24,6 +26,7 @@ class AMQPRPCServer extends AMQPEndpoint { this._requestsQueue = params.requestsQueue; this._commands = {}; + this._prefetchCount = params.prefetchCount || 0; } /** @@ -37,10 +40,14 @@ class AMQPRPCServer extends AMQPEndpoint { if (this._requestsQueue === '') { - const response = await this._channel.assertQueue('', {exclusive: true}); + const response = await this._channel.assertQueue('', { exclusive: true }); this._requestsQueue = response.queue; } + if (this._channel.prefetch && this._prefetchCount !== 0) { + await this._channel.prefetch(this._prefetchCount); + } + const consumeResult = await this._channel.consume(this._requestsQueue, (msg) => this._handleMsg(msg)); this._consumerTag = consumeResult.consumerTag } @@ -80,20 +87,21 @@ class AMQPRPCServer extends AMQPEndpoint { */ async _handleMsg(msg) { - this._channel.ack(msg); const replyTo = msg.properties.replyTo; const correlationId = msg.properties.correlationId; const persistent = msg.properties.deliveryMode !== 1; try { const result = await this._dispatchCommand(msg); - const content = new CommandResult(CommandResult.STATES.SUCCESS, result).pack(); - this._channel.sendToQueue(replyTo, content, {correlationId, persistent}); - + await this._channel.sendToQueue(replyTo, content, {correlationId, persistent}); + await this._channel.ack(msg); } catch (error) { - const content = new CommandResult(CommandResult.STATES.ERROR, error).pack(); - this._channel.sendToQueue(replyTo, content, {correlationId, persistent}); + if (this._channel) { + const content = new CommandResult(CommandResult.STATES.ERROR, error).pack(); + await this._channel.sendToQueue(replyTo, content, {correlationId, persistent}); + await this._channel.reject(msg); + } } } diff --git a/test/integration/AMQPRPC.test.js b/test/integration/AMQPRPC.test.js index 05d0a85..026f1af 100644 --- a/test/integration/AMQPRPC.test.js +++ b/test/integration/AMQPRPC.test.js @@ -10,6 +10,7 @@ const {AMQPRPCClient, AMQPRPCServer} = require('../..'); const helpers = require('../helpers.js'); const E_CUSTOM_ERROR_CODE = 'E_CUSTOM_ERROR_CODE'; +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); function getResultExample() { return { @@ -139,6 +140,27 @@ describe('AMQPRPCClient to AMQPRPCServer', () => { .and.eventually.have.property('code', E_CUSTOM_ERROR_CODE); }); }); + describe('when client send multiples commands', () => { + it('should run the command method multiple times in parallel', async () => { + const results = []; + let times = 0; + server.addCommand('command', async () => { + const i = times++; + // first message is slower than second; + if (i === 0) { + await sleep(30); + } + results.push(i); + }); + + await Promise.all([ + client.sendCommand('command'), + client.sendCommand('command') + ]); + + expect(results).to.deep.equal([1, 0]); + }); + }); describe('AMQPRPCClient', () => { it('Should handle timeouts', async () => { diff --git a/test/integration/parallelism.test.js b/test/integration/parallelism.test.js new file mode 100644 index 0000000..975e34c --- /dev/null +++ b/test/integration/parallelism.test.js @@ -0,0 +1,53 @@ +'use strict'; + +const chai = require('chai'); +const chaiAsPromised = require("chai-as-promised"); +chai.use(chaiAsPromised); +const {expect} = chai; + +const {AMQPRPCClient, AMQPRPCServer} = require('../..'); +const helpers = require('../helpers.js'); +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + + +describe('AMQPServer with prefetchCount = 1', () => { + let connection; + let client; + let server; + + beforeEach(async () => { + connection = await helpers.getAmqpConnection(); + server = new AMQPRPCServer(connection, { prefetchCount: 1 }); + await server.start(); + client = new AMQPRPCClient(connection, { requestsQueue: server.requestsQueue, timeout: 200 }); + await client.start(); + }); + + afterEach(async () => { + await server.disconnect(); + await client.disconnect(); + await helpers.closeAmqpConnection(); + }); + + describe('when client send multiples command before finishing', () => { + it('should process one command at the time', async () => { + const results = []; + let times = 0; + server.addCommand('command', async () => { + const i = times++; + if (i === 0) { + // first message is slower; + await sleep(40); + } + results.push(i); + }); + + await Promise.all([ + client.sendCommand('command'), + client.sendCommand('command') + ]); + + expect(results).to.deep.equal([0, 1]); + }); + }); +});