From dd131feaa4784fe1c6c5192a8feba1a81854e5ea Mon Sep 17 00:00:00 2001 From: Jesse Carter Date: Wed, 18 Dec 2019 20:12:46 -0500 Subject: [PATCH] feat(rabbitmq): add ability to bind handlers to multiple exchange keys fix #79 --- .../rabbitmq/e2e/subscribe.e2e-spec.ts | 14 ++-- integration/rabbitmq/package.json | 14 ++-- integration/rabbitmq/yarn.lock | 66 +++++++++---------- packages/rabbitmq/src/amqp/connection.ts | 12 +++- packages/rabbitmq/src/rabbitmq.interfaces.ts | 2 +- 5 files changed, 60 insertions(+), 48 deletions(-) diff --git a/integration/rabbitmq/e2e/subscribe.e2e-spec.ts b/integration/rabbitmq/e2e/subscribe.e2e-spec.ts index 17b5e5644..e9b74af90 100644 --- a/integration/rabbitmq/e2e/subscribe.e2e-spec.ts +++ b/integration/rabbitmq/e2e/subscribe.e2e-spec.ts @@ -9,7 +9,8 @@ import { Test } from '@nestjs/testing'; const testHandler = jest.fn(); const exchange = 'testSubscribeExhange'; -const routingKey = 'testSubscribeRoute'; +const routingKey1 = 'testSubscribeRoute1'; +const routingKey2 = 'testSubscribeRoute2'; const testMessage = { messageProp: 42, }; @@ -18,7 +19,7 @@ const testMessage = { class SubscribeService { @RabbitSubscribe({ exchange, - routingKey, + routingKey: [routingKey1, routingKey2], queue: 'subscribeQueue', }) handleSubscribe(message: object) { @@ -55,11 +56,14 @@ describe('Rabbit Subscribe', () => { }); it('should receive subscribe messages and handle them', async done => { - amqpConnection.publish(exchange, routingKey, testMessage); + [routingKey1, routingKey2].forEach((x, i) => + amqpConnection.publish(exchange, x, `testMessage-${i}`), + ); setTimeout(() => { - expect(testHandler).toHaveBeenCalledTimes(1); - expect(testHandler).toHaveBeenCalledWith(testMessage); + expect(testHandler).toHaveBeenCalledTimes(2); + expect(testHandler).toHaveBeenCalledWith(`testMessage-0`); + expect(testHandler).toHaveBeenCalledWith(`testMessage-1`); done(); }, 50); }); diff --git a/integration/rabbitmq/package.json b/integration/rabbitmq/package.json index 48ae8832b..2eeff4da8 100644 --- a/integration/rabbitmq/package.json +++ b/integration/rabbitmq/package.json @@ -19,17 +19,17 @@ "test:e2e": "jest --config ../../jest-e2e.json" }, "dependencies": { - "@nestjs/common": "6.9.0", - "@nestjs/core": "6.9.0", - "@nestjs/microservices": "6.9.0", - "@nestjs/platform-express": "^6.9.0", - "@nestjs/websockets": "6.9.0", + "@nestjs/common": "6.10.12", + "@nestjs/core": "6.10.12", + "@nestjs/microservices": "6.10.12", + "@nestjs/platform-express": "^6.10.12", + "@nestjs/websockets": "6.10.12", "reflect-metadata": "0.1.13", "rimraf": "2.6.3", - "rxjs": "6.4.0" + "rxjs": "6.5.3" }, "devDependencies": { - "@nestjs/testing": "6.9.0", + "@nestjs/testing": "6.10.12", "@types/express": "4.16.1", "@types/jest": "24.0.11", "@types/node": "11.11.6", diff --git a/integration/rabbitmq/yarn.lock b/integration/rabbitmq/yarn.lock index 010987739..2fe00f300 100644 --- a/integration/rabbitmq/yarn.lock +++ b/integration/rabbitmq/yarn.lock @@ -276,55 +276,55 @@ "@types/istanbul-lib-coverage" "^1.1.0" "@types/yargs" "^12.0.9" -"@nestjs/common@6.9.0": - version "6.9.0" - resolved "https://registry.yarnpkg.com/@nestjs/common/-/common-6.9.0.tgz#00323078ff7e8585df4921af0f4f75dfde873866" - integrity sha512-dIRiKob3SkXA2JoV3/Li3vThmZAh2yEH2znM3DRXxcA+twLzM7sJNxAuVsqaVUNwZ89Piepu66fm2BFcKNlJ6A== +"@nestjs/common@6.10.12": + version "6.10.12" + resolved "https://registry.yarnpkg.com/@nestjs/common/-/common-6.10.12.tgz#3ac48b5ec305243802320c5c53480fab3a5c1820" + integrity sha512-UWQ3HqMDVAf85HNZAgahT0Ig7/4WGI1jE+UvZlNMIrLcI8y98LHwbS8fwHUxdRYW9L7NEDTYn8oBL6lS+z4OWA== dependencies: axios "0.19.0" cli-color "2.0.0" uuid "3.3.3" -"@nestjs/core@6.9.0": - version "6.9.0" - resolved "https://registry.yarnpkg.com/@nestjs/core/-/core-6.9.0.tgz#cf4ceb19b699f838f2dc5bc46797f6e357f2cfc8" - integrity sha512-DvzhJQAs8dXzlJ1Ru6YPHQM+7ODQ2Ity5Ad6OU1YHqQp2GJEwa+6uzCVL/37zu7i/FhUfgLCZLcwHhofRPcgkg== +"@nestjs/core@6.10.12": + version "6.10.12" + resolved "https://registry.yarnpkg.com/@nestjs/core/-/core-6.10.12.tgz#546363837e3cf9a4467e24c4cd8902e9a492668b" + integrity sha512-pZ+KK7BQ9OH6ZSQoakbodFTuoUqa14WDJ/d1GQZbDpkGGOv26rmitjHmuBRMkttkHHsSvLtU1Y2e4vtMFevDcA== dependencies: "@nuxtjs/opencollective" "0.2.2" fast-safe-stringify "2.0.7" iterare "1.2.0" - object-hash "2.0.0" + object-hash "2.0.1" uuid "3.3.3" -"@nestjs/microservices@6.9.0": - version "6.9.0" - resolved "https://registry.yarnpkg.com/@nestjs/microservices/-/microservices-6.9.0.tgz#b8d43d628c5a71db650ebf76243ef9132ff6450b" - integrity sha512-HxnqnJFkXP0vuywzFrO/hhgAAi73hw0lg3yWQdxBKUsAwlP1my9tMmVxo4hr80ilLiTzgjWnoRuAus//bFytLg== +"@nestjs/microservices@6.10.12": + version "6.10.12" + resolved "https://registry.yarnpkg.com/@nestjs/microservices/-/microservices-6.10.12.tgz#88cefaa363e4b20919a3391087e982da52ad02bf" + integrity sha512-I7eYZpeVNCt2LM6PmlVWWVRvoOeJz00jABcLo+KKBnlo2pIWCH7W84wZslSWSJPi2q11j6tWvfAtUdR29pX53Q== dependencies: iterare "1.2.0" json-socket "0.3.0" -"@nestjs/platform-express@^6.9.0": - version "6.9.0" - resolved "https://registry.yarnpkg.com/@nestjs/platform-express/-/platform-express-6.9.0.tgz#4d517ce7ba12c96daa27bde159fb0d915391f71a" - integrity sha512-r44oHpTmUUW488BQh3v9ZxABs+xhlnTAb/SWO4rAI4MLTskJ0qpgJ6IPKCUgphiecl50A5O3x9kWJ3tYDWuy/w== +"@nestjs/platform-express@^6.10.12": + version "6.10.12" + resolved "https://registry.yarnpkg.com/@nestjs/platform-express/-/platform-express-6.10.12.tgz#e8c3189a5b3761c05ec0bbc40db0573b5070e9cc" + integrity sha512-XNW2tBIF229OQtZ+rcBLGcqh7QM2C3et7zry/af+nWNecKCiORyDNAQBAi8r6n6BtQJUv4Po7Wts5CGJz0Nldw== dependencies: body-parser "1.19.0" cors "2.8.5" express "4.17.1" multer "1.4.2" -"@nestjs/testing@6.9.0": - version "6.9.0" - resolved "https://registry.yarnpkg.com/@nestjs/testing/-/testing-6.9.0.tgz#7d806c53555d1f2a89cd061dc7e476c04441fe89" - integrity sha512-vCxUiu5XnPhaQ3RFHQmj04mnQvFHPGAyTEds/2/EpXCPmRbnqPl67NrA1xDTI8GX8sy+yg4DHMPrT7IjQwZfzg== +"@nestjs/testing@6.10.12": + version "6.10.12" + resolved "https://registry.yarnpkg.com/@nestjs/testing/-/testing-6.10.12.tgz#146c90bf0c8b2575e5eaa11ec86007147a063cf9" + integrity sha512-dHCv7FFnZingfUeuQKO3ybbC/kWyrP2W/i7SSn04+ntvKDzd9cysNgEH1YCdUjH2HeADy21H0CK2Q8OqY8ROTA== dependencies: optional "0.1.4" -"@nestjs/websockets@6.9.0": - version "6.9.0" - resolved "https://registry.yarnpkg.com/@nestjs/websockets/-/websockets-6.9.0.tgz#9ad17a34e3a3405ba4fbda49ab7e118c90ce2c44" - integrity sha512-6dsB4V2ovdDFSF80wTDb6C28KJ6gNpbag0q/e4rReFc+uIOPvm1PVFQt6Q2g/0OZfhnwVXjBGRNgtCF0pXDZCQ== +"@nestjs/websockets@6.10.12": + version "6.10.12" + resolved "https://registry.yarnpkg.com/@nestjs/websockets/-/websockets-6.10.12.tgz#18b2171d193f522cb3457a52d8f26ec1adf084c0" + integrity sha512-E9CmcuEB/3xZym9uXAINHUdCwGkpJEXPwWVxWJ9kxKxlDPU+mM25ox1HL+hnAjX70GjIe6PLjf/8V8SXCCiPdg== dependencies: iterare "1.2.0" @@ -3473,10 +3473,10 @@ object-copy@^0.1.0: define-property "^0.2.5" kind-of "^3.0.3" -object-hash@2.0.0: - version "2.0.0" - resolved "https://registry.yarnpkg.com/object-hash/-/object-hash-2.0.0.tgz#7c4cc341eb8b53367312a7c546142f00c9e0ea20" - integrity sha512-I7zGBH0rDKwVGeGZpZoFaDhIwvJa3l1CZE+8VchylXbInNiCj7sxxea9P5dTM4ftKR5//nrqxrdeGSTWL2VpBA== +object-hash@2.0.1: + version "2.0.1" + resolved "https://registry.yarnpkg.com/object-hash/-/object-hash-2.0.1.tgz#cef18a0c940cc60aa27965ecf49b782cbf101d96" + integrity sha512-HgcGMooY4JC2PBt9sdUdJ6PMzpin+YtY3r/7wg0uTifP+HJWW8rammseSEHuyt0UeShI183UGssCJqm1bJR7QA== object-keys@^1.0.12: version "1.1.0" @@ -4042,10 +4042,10 @@ rsvp@^4.8.4: resolved "https://registry.yarnpkg.com/rsvp/-/rsvp-4.8.4.tgz#b50e6b34583f3dd89329a2f23a8a2be072845911" integrity sha512-6FomvYPfs+Jy9TfXmBpBuMWNH94SgCsZmJKcanySzgNNP6LjWxBvyLTa9KaMfDDM5oxRfrKDB0r/qeRsLwnBfA== -rxjs@6.4.0: - version "6.4.0" - resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-6.4.0.tgz#f3bb0fe7bda7fb69deac0c16f17b50b0b8790504" - integrity sha512-Z9Yfa11F6B9Sg/BK9MnqnQ+aQYicPLtilXBp2yUtDt2JRCE0h26d33EnfO3ZxoNxG0T92OUucP3Ct7cpfkdFfw== +rxjs@6.5.3: + version "6.5.3" + resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-6.5.3.tgz#510e26317f4db91a7eb1de77d9dd9ba0a4899a3a" + integrity sha512-wuYsAYYFdWTAnAaPoKGNhfpWwKZbJW+HgAJ+mImp+Epl7BG8oNWBCTyRM8gba9k4lk8BgWdoYm21Mo/RYhhbgA== dependencies: tslib "^1.9.0" diff --git a/packages/rabbitmq/src/amqp/connection.ts b/packages/rabbitmq/src/amqp/connection.ts index 9c7d1223b..85a5b6034 100644 --- a/packages/rabbitmq/src/amqp/connection.ts +++ b/packages/rabbitmq/src/amqp/connection.ts @@ -106,7 +106,11 @@ export class AmqpConnection { msgOptions.queueOptions || undefined ); - await this.channel.bindQueue(queue, exchange, routingKey); + const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey]; + + await Promise.all( + routingKeys.map(x => this.channel.bindQueue(queue, exchange, x)) + ); await this.channel.consume(queue, async msg => { try { @@ -166,7 +170,11 @@ export class AmqpConnection { rpcOptions.queueOptions || undefined ); - await this.channel.bindQueue(queue, exchange, routingKey); + const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey]; + + await Promise.all( + routingKeys.map(x => this.channel.bindQueue(queue, exchange, x)) + ); await this.channel.consume(queue, async msg => { try { diff --git a/packages/rabbitmq/src/rabbitmq.interfaces.ts b/packages/rabbitmq/src/rabbitmq.interfaces.ts index 69f1e2fc4..1f522641e 100644 --- a/packages/rabbitmq/src/rabbitmq.interfaces.ts +++ b/packages/rabbitmq/src/rabbitmq.interfaces.ts @@ -39,7 +39,7 @@ export enum MessageHandlerErrorBehavior { export interface MessageHandlerOptions { exchange: string; - routingKey: string; + routingKey: string | string[]; queue?: string; queueOptions?: QueueOptions; errorBehavior?: MessageHandlerErrorBehavior;