Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions examples/rabbitmq-rpc-with-direct-reply.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/* eslint-disable no-console */
/* eslint-disable import/no-extraneous-dependencies */

//For more information about how this work, please read this:
// https://www.rabbitmq.com/direct-reply-to.html

const amqplib = require('amqplib');
const { AMQPRPCClient, AMQPRPCServer } = require('..');


function delay(ms) {
return new Promise(resolve => {
setTimeout(resolve, ms);
})
}

/**
*
* @return {Promise<String>} queueName when server listens on for requests
*/
async function initServer() {
console.log('Server starting');
const connection = await amqplib.connect('amqp://localhost');
const server = new AMQPRPCServer(connection);

server.addCommand('hello', (name) => ({message: `Hello, ${name}!`}));

server.addCommand('get-time', () => ({time: new Date()}));

await server.start();
console.log('Server is ready');
return server.requestsQueue;
}

/**
*
* @param requestsQueue
* @return {Promise<void>}
*/
async function initClient1(requestsQueue) {
console.log('Tom starting');
const connection = await amqplib.connect('amqp://localhost');
const client = new AMQPRPCClient(connection, {
requestsQueue,
repliesQueue: 'amq.rabbitmq.reply-to',
consumeOptions: {
noAck: true
}
});
await client.start();

const response1 = await client.sendCommand('hello', ['Tom']);
console.log(`Tom got hello response ${response1.message}`);

await delay(100);

const response2 = await client.sendCommand('get-time', []);
console.log(`Tom got 1st response for get-time: ${response2.time}`);

await delay(100);

const response3 = await client.sendCommand('get-time', []);
console.log(`Tom got 2nd response for get-time: ${response3.time}`);
}

async function initClient2(requestsQueue) {
console.log('Alisa starting');
const connection = await amqplib.connect('amqp://localhost');
const client = new AMQPRPCClient(connection, {
requestsQueue,
repliesQueue: 'amq.rabbitmq.reply-to',
consumeOptions: {
noAck: true
}
});
await client.start();

const response1 = await client.sendCommand('hello', ['Alisa']);
console.log(`Alisa got hello response ${response1.message}`);

await delay(150);

const response2 = await client.sendCommand('get-time', []);
console.log(`Alisa got response for get-time: ${response2.time}`);
}


(async function main() {
console.info('\n launch server:\n');
const tmpQueueName = await initServer();

console.info('\n launch clients:\n');
await Promise.all([
initClient1(tmpQueueName),
initClient2(tmpQueueName)
]);
})().catch(console.error.bind(console, 'General error:'));

13 changes: 11 additions & 2 deletions src/AMQPRPCClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class AMQPRPCClient extends AMQPEndpoint {
* default is '' which means auto-generated queue name
* @param {Number} [params.timeout=60000] Timeout for cases when server is not responding
* @param {Object} [params.defaultMessageOptions] additional options for publishing the request to the queue
* @param {Object} [params.consumeOptions={}] Additional options to use in the channel.consume method
*/
constructor(connection, params = {}) {
params.repliesQueue = params.repliesQueue || '';
Expand All @@ -32,6 +33,7 @@ class AMQPRPCClient extends AMQPEndpoint {
this._repliesQueue = params.repliesQueue;
this._cmdNumber = 0;
this._requests = new Map();
this._consumeOptions = params.consumeOptions || {};
this._defaultMessageOptions = params.defaultMessageOptions || {};
}

Expand Down Expand Up @@ -96,7 +98,11 @@ class AMQPRPCClient extends AMQPEndpoint {
this._repliesQueue = response.queue;
}

const consumeResult = await this._channel.consume(this._repliesQueue, (msg) => this._dispatchReply(msg));
const consumeResult = await this._channel.consume(
this._repliesQueue,
(msg) => this._dispatchReply(msg),
this._consumeOptions);

this._consumerTag = consumeResult.consumerTag
}

Expand Down Expand Up @@ -125,7 +131,10 @@ class AMQPRPCClient extends AMQPEndpoint {
* @returns {Promise}
*/
async _dispatchReply(msg) {
this._channel.ack(msg);
if (!this._consumeOptions.noAck) {
this._channel.ack(msg);
}

if (!msg) {
//skip, it's queue close message
return;
Expand Down
18 changes: 18 additions & 0 deletions test/unit/AMQPRPCClient.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ describe('AMQPRPCClient', () => {
expect(client.repliesQueue).to.equal(repliesQueue);
});

it('should use consumerOptions if provided', async () => {
const repliesQueue = 'qq';
const client = new AMQPRPCClient(connectionStub, {
repliesQueue,
requestsQueue: 'q',
consumeOptions: { noAck: true }
});
await client.start();
expect(channelStub.assertQueue).not.to.be.called;
expect(client.repliesQueue).to.equal(repliesQueue);
expect(channelStub.consume).to.be.calledOnce
.and.calledWithMatch(
sinon.match(repliesQueue),
sinon.match.func,
sinon.match({ noAck: true })
);
});

it('should start listening from queue', async () => {
const client = new AMQPRPCClient(connectionStub, {requestsQueue: 'q'});
let consumerMethod;
Expand Down