diff --git a/examples/rabbitmq-rpc-with-direct-reply.js b/examples/rabbitmq-rpc-with-direct-reply.js new file mode 100644 index 0000000..ce36be1 --- /dev/null +++ b/examples/rabbitmq-rpc-with-direct-reply.js @@ -0,0 +1,98 @@ +/* eslint-disable no-console */ +/* eslint-disable import/no-extraneous-dependencies */ + +//For more information about how this work, please read this: +// https://www.rabbitmq.com/direct-reply-to.html + +const amqplib = require('amqplib'); +const { AMQPRPCClient, AMQPRPCServer } = require('..'); + + +function delay(ms) { + return new Promise(resolve => { + setTimeout(resolve, ms); + }) +} + +/** + * + * @return {Promise} queueName when server listens on for requests + */ +async function initServer() { + console.log('Server starting'); + const connection = await amqplib.connect('amqp://localhost'); + const server = new AMQPRPCServer(connection); + + server.addCommand('hello', (name) => ({message: `Hello, ${name}!`})); + + server.addCommand('get-time', () => ({time: new Date()})); + + await server.start(); + console.log('Server is ready'); + return server.requestsQueue; +} + +/** + * + * @param requestsQueue + * @return {Promise} + */ +async function initClient1(requestsQueue) { + console.log('Tom starting'); + const connection = await amqplib.connect('amqp://localhost'); + const client = new AMQPRPCClient(connection, { + requestsQueue, + repliesQueue: 'amq.rabbitmq.reply-to', + consumeOptions: { + noAck: true + } + }); + await client.start(); + + const response1 = await client.sendCommand('hello', ['Tom']); + console.log(`Tom got hello response ${response1.message}`); + + await delay(100); + + const response2 = await client.sendCommand('get-time', []); + console.log(`Tom got 1st response for get-time: ${response2.time}`); + + await delay(100); + + const response3 = await client.sendCommand('get-time', []); + console.log(`Tom got 2nd response for get-time: ${response3.time}`); +} + +async function initClient2(requestsQueue) { + console.log('Alisa starting'); + const connection = await amqplib.connect('amqp://localhost'); + const client = new AMQPRPCClient(connection, { + requestsQueue, + repliesQueue: 'amq.rabbitmq.reply-to', + consumeOptions: { + noAck: true + } + }); + await client.start(); + + const response1 = await client.sendCommand('hello', ['Alisa']); + console.log(`Alisa got hello response ${response1.message}`); + + await delay(150); + + const response2 = await client.sendCommand('get-time', []); + console.log(`Alisa got response for get-time: ${response2.time}`); +} + + +(async function main() { + console.info('\n launch server:\n'); + const tmpQueueName = await initServer(); + + console.info('\n launch clients:\n'); + await Promise.all([ + initClient1(tmpQueueName), + initClient2(tmpQueueName) + ]); +})().catch(console.error.bind(console, 'General error:')); + diff --git a/src/AMQPRPCClient.js b/src/AMQPRPCClient.js index 9f36648..93a80ae 100644 --- a/src/AMQPRPCClient.js +++ b/src/AMQPRPCClient.js @@ -19,6 +19,7 @@ class AMQPRPCClient extends AMQPEndpoint { * default is '' which means auto-generated queue name * @param {Number} [params.timeout=60000] Timeout for cases when server is not responding * @param {Object} [params.defaultMessageOptions] additional options for publishing the request to the queue + * @param {Object} [params.consumeOptions={}] Additional options to use in the channel.consume method */ constructor(connection, params = {}) { params.repliesQueue = params.repliesQueue || ''; @@ -32,6 +33,7 @@ class AMQPRPCClient extends AMQPEndpoint { this._repliesQueue = params.repliesQueue; this._cmdNumber = 0; this._requests = new Map(); + this._consumeOptions = params.consumeOptions || {}; this._defaultMessageOptions = params.defaultMessageOptions || {}; } @@ -96,7 +98,11 @@ class AMQPRPCClient extends AMQPEndpoint { this._repliesQueue = response.queue; } - const consumeResult = await this._channel.consume(this._repliesQueue, (msg) => this._dispatchReply(msg)); + const consumeResult = await this._channel.consume( + this._repliesQueue, + (msg) => this._dispatchReply(msg), + this._consumeOptions); + this._consumerTag = consumeResult.consumerTag } @@ -125,7 +131,10 @@ class AMQPRPCClient extends AMQPEndpoint { * @returns {Promise} */ async _dispatchReply(msg) { - this._channel.ack(msg); + if (!this._consumeOptions.noAck) { + this._channel.ack(msg); + } + if (!msg) { //skip, it's queue close message return; diff --git a/test/unit/AMQPRPCClient.test.js b/test/unit/AMQPRPCClient.test.js index 11c93ce..aecb866 100644 --- a/test/unit/AMQPRPCClient.test.js +++ b/test/unit/AMQPRPCClient.test.js @@ -121,6 +121,24 @@ describe('AMQPRPCClient', () => { expect(client.repliesQueue).to.equal(repliesQueue); }); + it('should use consumerOptions if provided', async () => { + const repliesQueue = 'qq'; + const client = new AMQPRPCClient(connectionStub, { + repliesQueue, + requestsQueue: 'q', + consumeOptions: { noAck: true } + }); + await client.start(); + expect(channelStub.assertQueue).not.to.be.called; + expect(client.repliesQueue).to.equal(repliesQueue); + expect(channelStub.consume).to.be.calledOnce + .and.calledWithMatch( + sinon.match(repliesQueue), + sinon.match.func, + sinon.match({ noAck: true }) + ); + }); + it('should start listening from queue', async () => { const client = new AMQPRPCClient(connectionStub, {requestsQueue: 'q'}); let consumerMethod;