Skip to content

Commit e02da41

Browse files
committed
allow additional consume options
1 parent 2b3fcea commit e02da41

File tree

3 files changed

+127
-2
lines changed

3 files changed

+127
-2
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/* eslint-disable no-console */
2+
/* eslint-disable import/no-extraneous-dependencies */
3+
4+
//For more information about how this work, please read this:
5+
// https://www.rabbitmq.com/direct-reply-to.html
6+
7+
const amqplib = require('amqplib');
8+
const { AMQPRPCClient, AMQPRPCServer } = require('..');
9+
10+
11+
function delay(ms) {
12+
return new Promise(resolve => {
13+
setTimeout(resolve, ms);
14+
})
15+
}
16+
17+
/**
18+
*
19+
* @return {Promise<String>} queueName when server listens on for requests
20+
*/
21+
async function initServer() {
22+
console.log('Server starting');
23+
const connection = await amqplib.connect('amqp://localhost');
24+
const server = new AMQPRPCServer(connection);
25+
26+
server.addCommand('hello', (name) => ({message: `Hello, ${name}!`}));
27+
28+
server.addCommand('get-time', () => ({time: new Date()}));
29+
30+
await server.start();
31+
console.log('Server is ready');
32+
return server.requestsQueue;
33+
}
34+
35+
/**
36+
*
37+
* @param requestsQueue
38+
* @return {Promise<void>}
39+
*/
40+
async function initClient1(requestsQueue) {
41+
console.log('Tom starting');
42+
const connection = await amqplib.connect('amqp://localhost');
43+
const client = new AMQPRPCClient(connection, {
44+
requestsQueue,
45+
repliesQueue: 'amq.rabbitmq.reply-to',
46+
consumeOptions: {
47+
noAck: true
48+
}
49+
});
50+
await client.start();
51+
52+
const response1 = await client.sendCommand('hello', ['Tom']);
53+
console.log(`Tom got hello response ${response1.message}`);
54+
55+
await delay(100);
56+
57+
const response2 = await client.sendCommand('get-time', []);
58+
console.log(`Tom got 1st response for get-time: ${response2.time}`);
59+
60+
await delay(100);
61+
62+
const response3 = await client.sendCommand('get-time', []);
63+
console.log(`Tom got 2nd response for get-time: ${response3.time}`);
64+
}
65+
66+
async function initClient2(requestsQueue) {
67+
console.log('Alisa starting');
68+
const connection = await amqplib.connect('amqp://localhost');
69+
const client = new AMQPRPCClient(connection, {
70+
requestsQueue,
71+
repliesQueue: 'amq.rabbitmq.reply-to',
72+
consumeOptions: {
73+
noAck: true
74+
}
75+
});
76+
await client.start();
77+
78+
const response1 = await client.sendCommand('hello', ['Alisa']);
79+
console.log(`Alisa got hello response ${response1.message}`);
80+
81+
await delay(150);
82+
83+
const response2 = await client.sendCommand('get-time', []);
84+
console.log(`Alisa got response for get-time: ${response2.time}`);
85+
}
86+
87+
88+
(async function main() {
89+
console.info('\n launch server:\n');
90+
const tmpQueueName = await initServer();
91+
92+
console.info('\n launch clients:\n');
93+
await Promise.all([
94+
initClient1(tmpQueueName),
95+
initClient2(tmpQueueName)
96+
]);
97+
})().catch(console.error.bind(console, 'General error:'));
98+

src/AMQPRPCClient.js

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class AMQPRPCClient extends AMQPEndpoint {
1919
* default is '' which means auto-generated queue name
2020
* @param {Number} [params.timeout=60000] Timeout for cases when server is not responding
2121
* @param {Object} [params.defaultMessageOptions] additional options for publishing the request to the queue
22+
* @param {Object} [params.consumeOptions={}] Additional options to use in the channel.consume method
2223
*/
2324
constructor(connection, params = {}) {
2425
params.repliesQueue = params.repliesQueue || '';
@@ -32,6 +33,7 @@ class AMQPRPCClient extends AMQPEndpoint {
3233
this._repliesQueue = params.repliesQueue;
3334
this._cmdNumber = 0;
3435
this._requests = new Map();
36+
this._consumeOptions = params.consumeOptions || {};
3537
this._defaultMessageOptions = params.defaultMessageOptions || {};
3638
}
3739

@@ -96,7 +98,11 @@ class AMQPRPCClient extends AMQPEndpoint {
9698
this._repliesQueue = response.queue;
9799
}
98100

99-
const consumeResult = await this._channel.consume(this._repliesQueue, (msg) => this._dispatchReply(msg));
101+
const consumeResult = await this._channel.consume(
102+
this._repliesQueue,
103+
(msg) => this._dispatchReply(msg),
104+
this._consumeOptions);
105+
100106
this._consumerTag = consumeResult.consumerTag
101107
}
102108

@@ -125,7 +131,10 @@ class AMQPRPCClient extends AMQPEndpoint {
125131
* @returns {Promise}
126132
*/
127133
async _dispatchReply(msg) {
128-
this._channel.ack(msg);
134+
if (!this._consumeOptions.noAck) {
135+
this._channel.ack(msg);
136+
}
137+
129138
if (!msg) {
130139
//skip, it's queue close message
131140
return;

test/unit/AMQPRPCClient.test.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,24 @@ describe('AMQPRPCClient', () => {
121121
expect(client.repliesQueue).to.equal(repliesQueue);
122122
});
123123

124+
it('should use consumerOptions if provided', async () => {
125+
const repliesQueue = 'qq';
126+
const client = new AMQPRPCClient(connectionStub, {
127+
repliesQueue,
128+
requestsQueue: 'q',
129+
consumeOptions: { noAck: true }
130+
});
131+
await client.start();
132+
expect(channelStub.assertQueue).not.to.be.called;
133+
expect(client.repliesQueue).to.equal(repliesQueue);
134+
expect(channelStub.consume).to.be.calledOnce
135+
.and.calledWithMatch(
136+
sinon.match(repliesQueue),
137+
sinon.match.func,
138+
sinon.match({ noAck: true })
139+
);
140+
});
141+
124142
it('should start listening from queue', async () => {
125143
const client = new AMQPRPCClient(connectionStub, {requestsQueue: 'q'});
126144
let consumerMethod;

0 commit comments

Comments
 (0)