Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions src/AMQPRPCServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class AMQPRPCServer extends AMQPEndpoint {
* @param {Object} params
* @param {String} params.requestsQueue queue when AMQPRPC client sends commands, should correspond with AMQPRPCClient
* default is '' which means auto-generated queue name
* @param {String} params.prefetchCount specifies the number of commands that should be run in parallel
* default is 0 which means unlimited
*/
constructor(connection, params = {}) {
params.requestsQueue = params.requestsQueue || '';
Expand All @@ -24,6 +26,7 @@ class AMQPRPCServer extends AMQPEndpoint {

this._requestsQueue = params.requestsQueue;
this._commands = {};
this._prefetchCount = params.prefetchCount || 0;
}

/**
Expand All @@ -37,10 +40,14 @@ class AMQPRPCServer extends AMQPEndpoint {


if (this._requestsQueue === '') {
const response = await this._channel.assertQueue('', {exclusive: true});
const response = await this._channel.assertQueue('', { exclusive: true });
this._requestsQueue = response.queue;
}

if (this._channel.prefetch && this._prefetchCount !== 0) {
await this._channel.prefetch(this._prefetchCount);
}

const consumeResult = await this._channel.consume(this._requestsQueue, (msg) => this._handleMsg(msg));
this._consumerTag = consumeResult.consumerTag
}
Expand Down Expand Up @@ -80,20 +87,21 @@ class AMQPRPCServer extends AMQPEndpoint {
*/
async _handleMsg(msg) {

this._channel.ack(msg);
const replyTo = msg.properties.replyTo;
const correlationId = msg.properties.correlationId;
const persistent = msg.properties.deliveryMode !== 1;

try {
const result = await this._dispatchCommand(msg);

const content = new CommandResult(CommandResult.STATES.SUCCESS, result).pack();
this._channel.sendToQueue(replyTo, content, {correlationId, persistent});

await this._channel.sendToQueue(replyTo, content, {correlationId, persistent});
await this._channel.ack(msg);
} catch (error) {
const content = new CommandResult(CommandResult.STATES.ERROR, error).pack();
this._channel.sendToQueue(replyTo, content, {correlationId, persistent});
if (this._channel) {
const content = new CommandResult(CommandResult.STATES.ERROR, error).pack();
await this._channel.sendToQueue(replyTo, content, {correlationId, persistent});
await this._channel.reject(msg);
}
}
}

Expand Down
22 changes: 22 additions & 0 deletions test/integration/AMQPRPC.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {AMQPRPCClient, AMQPRPCServer} = require('../..');
const helpers = require('../helpers.js');

const E_CUSTOM_ERROR_CODE = 'E_CUSTOM_ERROR_CODE';
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

function getResultExample() {
return {
Expand Down Expand Up @@ -139,6 +140,27 @@ describe('AMQPRPCClient to AMQPRPCServer', () => {
.and.eventually.have.property('code', E_CUSTOM_ERROR_CODE);
});
});
describe('when client send multiples commands', () => {
it('should run the command method multiple times in parallel', async () => {
const results = [];
let times = 0;
server.addCommand('command', async () => {
const i = times++;
// first message is slower than second;
if (i === 0) {
await sleep(30);
}
results.push(i);
});

await Promise.all([
client.sendCommand('command'),
client.sendCommand('command')
]);

expect(results).to.deep.equal([1, 0]);
});
});

describe('AMQPRPCClient', () => {
it('Should handle timeouts', async () => {
Expand Down
53 changes: 53 additions & 0 deletions test/integration/parallelism.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict';

const chai = require('chai');
const chaiAsPromised = require("chai-as-promised");
chai.use(chaiAsPromised);
const {expect} = chai;

const {AMQPRPCClient, AMQPRPCServer} = require('../..');
const helpers = require('../helpers.js');
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));


describe('AMQPServer with prefetchCount = 1', () => {
let connection;
let client;
let server;

beforeEach(async () => {
connection = await helpers.getAmqpConnection();
server = new AMQPRPCServer(connection, { prefetchCount: 1 });
await server.start();
client = new AMQPRPCClient(connection, { requestsQueue: server.requestsQueue, timeout: 200 });
await client.start();
});

afterEach(async () => {
await server.disconnect();
await client.disconnect();
await helpers.closeAmqpConnection();
});

describe('when client send multiples command before finishing', () => {
it('should process one command at the time', async () => {
const results = [];
let times = 0;
server.addCommand('command', async () => {
const i = times++;
if (i === 0) {
// first message is slower;
await sleep(40);
}
results.push(i);
});

await Promise.all([
client.sendCommand('command'),
client.sendCommand('command')
]);

expect(results).to.deep.equal([0, 1]);
});
});
});