diff --git a/src/common/rabbitmq/rabbitmq.service.spec.ts b/src/common/rabbitmq/rabbitmq.service.spec.ts new file mode 100644 index 000000000..3b172453f --- /dev/null +++ b/src/common/rabbitmq/rabbitmq.service.spec.ts @@ -0,0 +1,154 @@ +import { ConfigService } from "@nestjs/config"; +import { Test, TestingModule } from "@nestjs/testing"; +import { RabbitMQService } from "./rabbitmq.service"; +import { connect as amqplibConnect } from "amqplib"; + +jest.mock("amqplib", () => ({ + connect: jest.fn(), +})); + +describe("RabbitMQService", () => { + let rabbitMQService: RabbitMQService; + + class ConfigServiceMock { + get(key: string) { + switch (key) { + case "rabbitMq.hostname": + return "localhost"; + case "rabbitMq.port": + return 5672; + case "rabbitMq.username": + return "guest"; + case "rabbitMq.password": + return "guest"; + default: + return undefined; + } + } + } + + const mockChannel = { + assertQueue: jest.fn(), + assertExchange: jest.fn(), + bindQueue: jest.fn(), + publish: jest.fn(), + close: jest.fn(), + }; + + const mockConnection = { + createChannel: jest.fn().mockResolvedValue(mockChannel), + close: jest.fn(), + }; + + beforeEach(async () => { + (amqplibConnect as jest.Mock).mockResolvedValue(mockConnection); + const module: TestingModule = await Test.createTestingModule({ + providers: [ + RabbitMQService, + { provide: ConfigService, useClass: ConfigServiceMock }, + ], + }).compile(); + rabbitMQService = module.get(RabbitMQService); + }); + + it("should be defined", () => { + expect(rabbitMQService).toBeDefined(); + }); + + it("should getValueFromConfig", () => { + const port = rabbitMQService.getValueFromConfig("port"); + expect(port).toEqual(5672); + }); + + it("should getValueFromConfig but throw error", () => { + expect(() => rabbitMQService.getValueFromConfig("notExist")).toThrowError( + "RabbitMQ is enabled but missing the config variable notExist.", + ); + }); + + it("should parseConfig", () => { + rabbitMQService.parseConfig(); + expect(rabbitMQService["connectionOptions"]).toEqual({ + hostname: "localhost", + port: 5672, + username: "guest", + password: "guest", + protocol: "amqp", + }); + }); + + it("should parseConfig but throw error", () => { + jest + .spyOn(rabbitMQService["configService"], "get") + .mockReturnValue(undefined); + expect(() => rabbitMQService.parseConfig()).toThrowError( + "RabbitMQ is enabled but missing the config variable hostname.", + ); + }); + + it("should connect", async () => { + await rabbitMQService["connect"](); + expect(amqplibConnect).toHaveBeenCalledWith( + rabbitMQService["connectionOptions"], + ); + expect(mockConnection.createChannel).toHaveBeenCalled(); + expect(rabbitMQService["channel"]).toBe(mockChannel); + expect(rabbitMQService["connection"]).toBe(mockConnection); + }); + + it("should connect but log error", async () => { + (amqplibConnect as jest.Mock).mockRejectedValueOnce( + new Error("Connection error"), + ); + await expect(rabbitMQService["connect"]()).rejects.toThrowError( + "Cannot connect to RabbitMQ", + ); + }); + + it("should bindQueue", async () => { + await rabbitMQService["connect"](); + await rabbitMQService["bindQueue"]("q", "ex", "key"); + expect(mockChannel.assertQueue).toHaveBeenCalledWith("q", { + durable: true, + }); + expect(mockChannel.assertExchange).toHaveBeenCalledWith("ex", "topic", { + durable: true, + }); + expect(mockChannel.bindQueue).toHaveBeenCalledWith("q", "ex", "key"); + }); + + it("should bindQueue but throw error", async () => { + mockChannel.assertQueue.mockRejectedValueOnce(new Error("Queue error")); + await expect( + rabbitMQService["bindQueue"]("bad-queue", "ex", "key"), + ).rejects.toThrowError( + "Could not connect to RabbitMQ queue bad-queue with exchange ex and key key.", + ); + }); + + it("should sendMessage", async () => { + await rabbitMQService["connect"](); + jest + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .spyOn(rabbitMQService, "bindQueue") + .mockResolvedValue(undefined); + await rabbitMQService.sendMessage("q", "ex", "key", "msg"); + expect(mockChannel.publish).toHaveBeenCalledWith( + "ex", + "key", + Buffer.from("msg"), + { persistent: true }, + ); + }); + + it("should sendMessage but throw error", async () => { + jest + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .spyOn(rabbitMQService, "bindQueue") + .mockResolvedValue(undefined); + mockChannel.publish.mockRejectedValueOnce(new Error("Publish error")); + await expect( + rabbitMQService.sendMessage("q", "ex", "key", "msg"), + ).rejects.toThrowError("Could not send message to RabbitMQ queue q."); + }); +}); diff --git a/src/common/rabbitmq/rabbitmq.service.ts b/src/common/rabbitmq/rabbitmq.service.ts index 67642ba12..b4c081c38 100644 --- a/src/common/rabbitmq/rabbitmq.service.ts +++ b/src/common/rabbitmq/rabbitmq.service.ts @@ -1,112 +1,135 @@ -import amqp, { Connection, Channel } from "amqplib/callback_api"; +import { + connect as amqplibConnect, + Options, + ChannelModel, + Channel, +} from "amqplib"; import { Injectable, Logger, OnModuleDestroy, OnApplicationShutdown, + OnModuleInit, } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; +function OnError(action: "throw" | "log" = "throw"): MethodDecorator { + return function ( + target: object, + propertyKey: string | symbol, + descriptor: PropertyDescriptor, + ) { + const originalMethod = descriptor.value; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + descriptor.value = async function (...args: any[]) { + try { + return await originalMethod.apply(this, args); + } catch (error) { + if (action === "log") { + Logger.error( + (this as RabbitMQService)._error + + JSON.stringify((error as Error).message), + "RabbitMQService", + ); + } else { + throw new Error((this as RabbitMQService)._error, { cause: error }); + } + } + }; + return descriptor; + }; +} + /** * Service for publishing messages to a RabbitMQ queue. */ @Injectable() -export class RabbitMQService implements OnModuleDestroy, OnApplicationShutdown { - private connectionOptions: amqp.Options.Connect; - private connection: Connection; +export class RabbitMQService + implements OnModuleInit, OnModuleDestroy, OnApplicationShutdown +{ + private connection: ChannelModel; private channel: Channel; + private static readonly configKeys = [ + "hostname", + "port", + "username", + "password", + ] as const; + private connectionOptions: Required< + Pick< + Options.Connect, + (typeof RabbitMQService.configKeys)[number] | "protocol" + > + >; + _error: string; - constructor(private readonly configService: ConfigService) { - Logger.log("Initializing RabbitMQService.", "RabbitMQService"); - - const hostname = this.configService.get("rabbitMq.hostname"); - const port = this.configService.get("rabbitMq.port"); - const username = this.configService.get("rabbitMq.username"); - const password = this.configService.get("rabbitMq.password"); + constructor(private readonly configService: ConfigService) {} - if (!hostname || !port || !username || !password) { - Logger.error( - "RabbitMQ is enabled but missing one or more config variables.", - "RabbitMQService", + getValueFromConfig(key: string): string | number { + const configValue = this.configService.get(`rabbitMq.${key}`); + if (!configValue) + throw new Error( + `RabbitMQ is enabled but missing the config variable ${key}.`, ); - } else { - this.connectionOptions = { - protocol: "amqp", - hostname: hostname, - port: port, - username: username, - password: password, - }; + return configValue; + } - amqp.connect( - this.connectionOptions, - (connectionError: Error, connection: Connection) => { - if (connectionError) { - Logger.error( - "Connection error in RabbitMQService: " + - JSON.stringify(connectionError.message), - "RabbitMQService", - ); - return; - } - this.connection = connection; + parseConfig(): void { + const connectionOptions: Record = {}; + RabbitMQService.configKeys.forEach( + (configKey) => + (connectionOptions[configKey] = this.getValueFromConfig(configKey)), + ); + connectionOptions.protocol = "amqp"; + this.connectionOptions = connectionOptions as typeof this.connectionOptions; + } - this.connection.createChannel( - (channelError: Error, channel: Channel) => { - if (channelError) { - Logger.error( - "Channel error in RabbitMQService: " + - JSON.stringify(channelError.message), - "RabbitMQService", - ); - return; - } - this.channel = channel; - }, - ); - }, - ); - } + async onModuleInit(): Promise { + Logger.log("Initializing RabbitMQService.", "RabbitMQService"); + this.parseConfig(); + await this.connect(); } - private connect(queue: string, exchange: string, key: string) { - try { - this.channel.assertQueue(queue, { durable: true }); - this.channel.assertExchange(exchange, "topic", { - durable: true, - }); - this.channel.bindQueue(queue, exchange, key); - } catch (error) { - throw new Error( - `Could not connect to RabbitMQ queue ${queue} with exchange ${exchange} and key ${key}.`, - { cause: error }, - ); - } + @OnError() + private async connect(): Promise { + this._error = "Cannot connect to RabbitMQ"; + this.connection = await amqplibConnect(this.connectionOptions); + this._error = "Channel error in RabbitMQService: "; + this.channel = await this.connection.createChannel(); } - sendMessage(queue: string, exchange: string, key: string, message: string) { - try { - this.connect(queue, exchange, key); - this.channel.sendToQueue(queue, Buffer.from(message), { - persistent: true, - }); - } catch (error) { - throw new Error(`Could not send message to RabbitMQ queue ${queue}.`, { - cause: error, - }); - } + @OnError() + private async bindQueue(queue: string, exchange: string, key: string) { + this._error = `Could not connect to RabbitMQ queue ${queue} with exchange ${exchange} and key ${key}.`; + await this.channel.assertQueue(queue, { durable: true }); + await this.channel.assertExchange(exchange, "topic", { + durable: true, + }); + await this.channel.bindQueue(queue, exchange, key); + } + + @OnError() + async sendMessage( + queue: string, + exchange: string, + key: string, + message: string, + ) { + this._error = `Could not send message to RabbitMQ queue ${queue}.`; + await this.bindQueue(queue, exchange, key); + this.channel.publish(exchange, key, Buffer.from(message), { + persistent: true, + }); } async close(): Promise { if (this.channel) { - await this.channel.close(() => { - Logger.log("RabbitMQ channel closed.", "RabbitMQService"); - }); + await this.channel.close(); + Logger.log("RabbitMQ channel closed.", "RabbitMQService"); } if (this.connection) { - await this.connection.close(() => { - Logger.log("RabbitMQ connection closed.", "RabbitMQService"); - }); + await this.connection.close(); + Logger.log("RabbitMQ connection closed.", "RabbitMQService"); } } diff --git a/src/config/job-config/actions/rabbitmqaction/rabbitmqaction.ts b/src/config/job-config/actions/rabbitmqaction/rabbitmqaction.ts index e1693c738..de54690fb 100644 --- a/src/config/job-config/actions/rabbitmqaction/rabbitmqaction.ts +++ b/src/config/job-config/actions/rabbitmqaction/rabbitmqaction.ts @@ -43,7 +43,7 @@ export class RabbitMQJobAction implements JobAction { `(Job ${context.job.id}) Performing RabbitMQJobAction`, "RabbitMQJobAction", ); - this.rabbitMQService.sendMessage( + await this.rabbitMQService.sendMessage( this.queue, this.exchange, this.key,