Skip to content

fix: send rabbitmq message to exchange and refactor #1872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions src/common/rabbitmq/rabbitmq.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { ConfigService } from "@nestjs/config";
import { Test, TestingModule } from "@nestjs/testing";
import { RabbitMQService } from "./rabbitmq.service";
import { connect as amqplibConnect } from "amqplib";

jest.mock("amqplib", () => ({
connect: jest.fn(),
}));

describe("RabbitMQService", () => {
let rabbitMQService: RabbitMQService;

class ConfigServiceMock {
get(key: string) {
switch (key) {
case "rabbitMq.hostname":
return "localhost";
case "rabbitMq.port":
return 5672;
case "rabbitMq.username":
return "guest";
case "rabbitMq.password":
return "guest";
default:
return undefined;
}
}
}

const mockChannel = {
assertQueue: jest.fn(),
assertExchange: jest.fn(),
bindQueue: jest.fn(),
publish: jest.fn(),
close: jest.fn(),
};

const mockConnection = {
createChannel: jest.fn().mockResolvedValue(mockChannel),
close: jest.fn(),
};

beforeEach(async () => {
(amqplibConnect as jest.Mock).mockResolvedValue(mockConnection);
const module: TestingModule = await Test.createTestingModule({
providers: [
RabbitMQService,
{ provide: ConfigService, useClass: ConfigServiceMock },
],
}).compile();
rabbitMQService = module.get<RabbitMQService>(RabbitMQService);
});

it("should be defined", () => {
expect(rabbitMQService).toBeDefined();
});

it("should getValueFromConfig", () => {
const port = rabbitMQService.getValueFromConfig("port");
expect(port).toEqual(5672);
});

it("should getValueFromConfig but throw error", () => {
expect(() => rabbitMQService.getValueFromConfig("notExist")).toThrowError(
"RabbitMQ is enabled but missing the config variable notExist.",
);
});

it("should parseConfig", () => {
rabbitMQService.parseConfig();
expect(rabbitMQService["connectionOptions"]).toEqual({
hostname: "localhost",
port: 5672,
username: "guest",
password: "guest",
protocol: "amqp",
});
});

it("should parseConfig but throw error", () => {
jest
.spyOn(rabbitMQService["configService"], "get")
.mockReturnValue(undefined);
expect(() => rabbitMQService.parseConfig()).toThrowError(
"RabbitMQ is enabled but missing the config variable hostname.",
);
});

it("should connect", async () => {
await rabbitMQService["connect"]();
expect(amqplibConnect).toHaveBeenCalledWith(
rabbitMQService["connectionOptions"],
);
expect(mockConnection.createChannel).toHaveBeenCalled();
expect(rabbitMQService["channel"]).toBe(mockChannel);
expect(rabbitMQService["connection"]).toBe(mockConnection);
});

it("should connect but log error", async () => {
(amqplibConnect as jest.Mock).mockRejectedValueOnce(
new Error("Connection error"),
);
await expect(rabbitMQService["connect"]()).rejects.toThrowError(
"Cannot connect to RabbitMQ",
);
});

it("should bindQueue", async () => {
await rabbitMQService["connect"]();
await rabbitMQService["bindQueue"]("q", "ex", "key");
expect(mockChannel.assertQueue).toHaveBeenCalledWith("q", {
durable: true,
});
expect(mockChannel.assertExchange).toHaveBeenCalledWith("ex", "topic", {
durable: true,
});
expect(mockChannel.bindQueue).toHaveBeenCalledWith("q", "ex", "key");
});

it("should bindQueue but throw error", async () => {
mockChannel.assertQueue.mockRejectedValueOnce(new Error("Queue error"));
await expect(
rabbitMQService["bindQueue"]("bad-queue", "ex", "key"),
).rejects.toThrowError(
"Could not connect to RabbitMQ queue bad-queue with exchange ex and key key.",
);
});

it("should sendMessage", async () => {
await rabbitMQService["connect"]();
jest
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.spyOn<any, string>(rabbitMQService, "bindQueue")
.mockResolvedValue(undefined);
await rabbitMQService.sendMessage("q", "ex", "key", "msg");
expect(mockChannel.publish).toHaveBeenCalledWith(
"ex",
"key",
Buffer.from("msg"),
{ persistent: true },
);
});

it("should sendMessage but throw error", async () => {
jest
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.spyOn<any, string>(rabbitMQService, "bindQueue")
.mockResolvedValue(undefined);
mockChannel.publish.mockRejectedValueOnce(new Error("Publish error"));
await expect(
rabbitMQService.sendMessage("q", "ex", "key", "msg"),
).rejects.toThrowError("Could not send message to RabbitMQ queue q.");
});
});
185 changes: 104 additions & 81 deletions src/common/rabbitmq/rabbitmq.service.ts
Original file line number Diff line number Diff line change
@@ -1,112 +1,135 @@
import amqp, { Connection, Channel } from "amqplib/callback_api";
import {
connect as amqplibConnect,
Options,
ChannelModel,
Channel,
} from "amqplib";
import {
Injectable,
Logger,
OnModuleDestroy,
OnApplicationShutdown,
OnModuleInit,
} from "@nestjs/common";
import { ConfigService } from "@nestjs/config";

function OnError(action: "throw" | "log" = "throw"): MethodDecorator {
return function (
target: object,
propertyKey: string | symbol,
descriptor: PropertyDescriptor,
) {
const originalMethod = descriptor.value;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
descriptor.value = async function (...args: any[]) {
try {
return await originalMethod.apply(this, args);
} catch (error) {
if (action === "log") {
Logger.error(
(this as RabbitMQService)._error +
JSON.stringify((error as Error).message),
"RabbitMQService",
);
} else {
throw new Error((this as RabbitMQService)._error, { cause: error });
}
}
};
return descriptor;
};
}

/**
* Service for publishing messages to a RabbitMQ queue.
*/
@Injectable()
export class RabbitMQService implements OnModuleDestroy, OnApplicationShutdown {
private connectionOptions: amqp.Options.Connect;
private connection: Connection;
export class RabbitMQService
implements OnModuleInit, OnModuleDestroy, OnApplicationShutdown
{
private connection: ChannelModel;
private channel: Channel;
private static readonly configKeys = [
"hostname",
"port",
"username",
"password",
] as const;
private connectionOptions: Required<
Pick<
Options.Connect,
(typeof RabbitMQService.configKeys)[number] | "protocol"
>
>;
_error: string;

constructor(private readonly configService: ConfigService) {
Logger.log("Initializing RabbitMQService.", "RabbitMQService");

const hostname = this.configService.get<string>("rabbitMq.hostname");
const port = this.configService.get<number>("rabbitMq.port");
const username = this.configService.get<string>("rabbitMq.username");
const password = this.configService.get<string>("rabbitMq.password");
constructor(private readonly configService: ConfigService) {}

if (!hostname || !port || !username || !password) {
Logger.error(
"RabbitMQ is enabled but missing one or more config variables.",
"RabbitMQService",
Comment on lines -19 to -30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the old code, as you can see the required fields immediately, but I'm not against having it programmatically either

getValueFromConfig(key: string): string | number {
const configValue = this.configService.get(`rabbitMq.${key}`);
if (!configValue)
throw new Error(
`RabbitMQ is enabled but missing the config variable ${key}.`,
);
} else {
this.connectionOptions = {
protocol: "amqp",
hostname: hostname,
port: port,
username: username,
password: password,
};
return configValue;
}

amqp.connect(
this.connectionOptions,
(connectionError: Error, connection: Connection) => {
if (connectionError) {
Logger.error(
"Connection error in RabbitMQService: " +
JSON.stringify(connectionError.message),
"RabbitMQService",
);
return;
}
this.connection = connection;
parseConfig(): void {
const connectionOptions: Record<string, string | number> = {};
RabbitMQService.configKeys.forEach(
(configKey) =>
(connectionOptions[configKey] = this.getValueFromConfig(configKey)),
);
connectionOptions.protocol = "amqp";
this.connectionOptions = connectionOptions as typeof this.connectionOptions;
}

this.connection.createChannel(
(channelError: Error, channel: Channel) => {
if (channelError) {
Logger.error(
"Channel error in RabbitMQService: " +
JSON.stringify(channelError.message),
"RabbitMQService",
);
return;
}
this.channel = channel;
},
);
},
);
}
async onModuleInit(): Promise<void> {
Logger.log("Initializing RabbitMQService.", "RabbitMQService");
this.parseConfig();
await this.connect();
}

private connect(queue: string, exchange: string, key: string) {
try {
this.channel.assertQueue(queue, { durable: true });
this.channel.assertExchange(exchange, "topic", {
durable: true,
});
this.channel.bindQueue(queue, exchange, key);
} catch (error) {
throw new Error(
`Could not connect to RabbitMQ queue ${queue} with exchange ${exchange} and key ${key}.`,
{ cause: error },
);
}
@OnError()
private async connect(): Promise<void> {
this._error = "Cannot connect to RabbitMQ";
this.connection = await amqplibConnect(this.connectionOptions);
this._error = "Channel error in RabbitMQService: ";
this.channel = await this.connection.createChannel();
Comment on lines +93 to +98
Copy link
Member

@Junjiequan Junjiequan May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my view, the OnError decorator brings unnecessary complexities. I personally would use try catch or just .catch() here to make it easy to read. But the idea here is to make it DRY I guess.

}

sendMessage(queue: string, exchange: string, key: string, message: string) {
try {
this.connect(queue, exchange, key);
this.channel.sendToQueue(queue, Buffer.from(message), {
persistent: true,
});
} catch (error) {
throw new Error(`Could not send message to RabbitMQ queue ${queue}.`, {
cause: error,
});
}
@OnError()
private async bindQueue(queue: string, exchange: string, key: string) {
this._error = `Could not connect to RabbitMQ queue ${queue} with exchange ${exchange} and key ${key}.`;
await this.channel.assertQueue(queue, { durable: true });
await this.channel.assertExchange(exchange, "topic", {
durable: true,
});
await this.channel.bindQueue(queue, exchange, key);
}

@OnError()
async sendMessage(
queue: string,
exchange: string,
key: string,
message: string,
) {
this._error = `Could not send message to RabbitMQ queue ${queue}.`;
await this.bindQueue(queue, exchange, key);
this.channel.publish(exchange, key, Buffer.from(message), {
persistent: true,
});
}

async close(): Promise<void> {
if (this.channel) {
await this.channel.close(() => {
Logger.log("RabbitMQ channel closed.", "RabbitMQService");
});
await this.channel.close();
Logger.log("RabbitMQ channel closed.", "RabbitMQService");
}
if (this.connection) {
await this.connection.close(() => {
Logger.log("RabbitMQ connection closed.", "RabbitMQService");
});
await this.connection.close();
Logger.log("RabbitMQ connection closed.", "RabbitMQService");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class RabbitMQJobAction<T extends JobDto> implements JobAction<T> {
`(Job ${context.job.id}) Performing RabbitMQJobAction`,
"RabbitMQJobAction",
);
this.rabbitMQService.sendMessage(
await this.rabbitMQService.sendMessage(
this.queue,
this.exchange,
this.key,
Expand Down