Skip to content

Commit 0307a54

Browse files
committed
Refactor rabbitmq service with tests
1 parent a440a79 commit 0307a54

File tree

2 files changed

+262
-81
lines changed

2 files changed

+262
-81
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import { ConfigService } from "@nestjs/config";
2+
import { Test, TestingModule } from "@nestjs/testing";
3+
import { RabbitMQService } from "./rabbitmq.service";
4+
import { connect as amqplibConnect } from "amqplib";
5+
import { Logger } from "@nestjs/common";
6+
7+
jest.mock("amqplib", () => ({
8+
connect: jest.fn(),
9+
}));
10+
11+
describe("RabbitMQService", () => {
12+
let rabbitMQService: RabbitMQService;
13+
14+
class ConfigServiceMock {
15+
get(key: string) {
16+
switch (key) {
17+
case "rabbitMq.hostname":
18+
return "localhost";
19+
case "rabbitMq.port":
20+
return 5672;
21+
case "rabbitMq.username":
22+
return "guest";
23+
case "rabbitMq.password":
24+
return "guest";
25+
default:
26+
return undefined;
27+
}
28+
}
29+
}
30+
31+
const mockChannel = {
32+
assertQueue: jest.fn(),
33+
assertExchange: jest.fn(),
34+
bindQueue: jest.fn(),
35+
publish: jest.fn(),
36+
close: jest.fn(),
37+
};
38+
39+
const mockConnection = {
40+
createChannel: jest.fn().mockResolvedValue(mockChannel),
41+
close: jest.fn(),
42+
};
43+
44+
beforeEach(async () => {
45+
(amqplibConnect as jest.Mock).mockResolvedValue(mockConnection);
46+
const module: TestingModule = await Test.createTestingModule({
47+
providers: [
48+
RabbitMQService,
49+
{ provide: ConfigService, useClass: ConfigServiceMock },
50+
],
51+
}).compile();
52+
rabbitMQService = module.get<RabbitMQService>(RabbitMQService);
53+
});
54+
55+
it("should be defined", () => {
56+
expect(rabbitMQService).toBeDefined();
57+
});
58+
59+
it("should getValueFromConfig", () => {
60+
const port = rabbitMQService.getValueFromConfig("port");
61+
expect(port).toEqual(5672);
62+
});
63+
64+
it("should getValueFromConfig but throw error", () => {
65+
expect(() => rabbitMQService.getValueFromConfig("notExist")).toThrowError(
66+
"RabbitMQ is enabled but missing the config variable notExist.",
67+
);
68+
});
69+
70+
it("should parseConfig", () => {
71+
rabbitMQService.parseConfig();
72+
expect(rabbitMQService["connectionOptions"]).toEqual({
73+
hostname: "localhost",
74+
port: 5672,
75+
username: "guest",
76+
password: "guest",
77+
protocol: "amqp",
78+
});
79+
});
80+
81+
it("should parseConfig but throw error", () => {
82+
jest
83+
.spyOn(rabbitMQService["configService"], "get")
84+
.mockReturnValue(undefined);
85+
expect(() => rabbitMQService.parseConfig()).toThrowError(
86+
"RabbitMQ is enabled but missing the config variable hostname.",
87+
);
88+
});
89+
90+
it("should connect", async () => {
91+
await rabbitMQService["connect"]();
92+
expect(amqplibConnect).toHaveBeenCalledWith(
93+
rabbitMQService["connectionOptions"],
94+
);
95+
expect(mockConnection.createChannel).toHaveBeenCalled();
96+
expect(rabbitMQService["channel"]).toBe(mockChannel);
97+
expect(rabbitMQService["connection"]).toBe(mockConnection);
98+
});
99+
100+
it("should connect but log error", async () => {
101+
(amqplibConnect as jest.Mock).mockRejectedValueOnce(
102+
new Error("Connection error"),
103+
);
104+
const errorSpy = jest.spyOn(Logger, "error");
105+
await rabbitMQService["connect"]();
106+
expect(errorSpy).toHaveBeenCalledWith(
107+
"Cannot connect to RabbitMQ: " + '"Connection error"',
108+
"RabbitMQService",
109+
);
110+
});
111+
112+
it("should bindQueue", async () => {
113+
await rabbitMQService["connect"]();
114+
await rabbitMQService["bindQueue"]("q", "ex", "key");
115+
expect(mockChannel.assertQueue).toHaveBeenCalledWith("q", {
116+
durable: true,
117+
});
118+
expect(mockChannel.assertExchange).toHaveBeenCalledWith("ex", "topic", {
119+
durable: true,
120+
});
121+
expect(mockChannel.bindQueue).toHaveBeenCalledWith("q", "ex", "key");
122+
});
123+
124+
it("should bindQueue but throw error", async () => {
125+
mockChannel.assertQueue.mockRejectedValueOnce(new Error("Queue error"));
126+
await expect(
127+
rabbitMQService["bindQueue"]("bad-queue", "ex", "key"),
128+
).rejects.toThrowError(
129+
"Could not connect to RabbitMQ queue bad-queue with exchange ex and key key.",
130+
);
131+
});
132+
133+
it("should sendMessage", async () => {
134+
await rabbitMQService["connect"]();
135+
jest
136+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
137+
.spyOn<any, string>(rabbitMQService, "bindQueue")
138+
.mockResolvedValue(undefined);
139+
await rabbitMQService.sendMessage("q", "ex", "key", "msg");
140+
expect(mockChannel.publish).toHaveBeenCalledWith(
141+
"ex",
142+
"key",
143+
Buffer.from("msg"),
144+
{ persistent: true },
145+
);
146+
});
147+
148+
it("should sendMessage but throw error", async () => {
149+
jest
150+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
151+
.spyOn<any, string>(rabbitMQService, "bindQueue")
152+
.mockResolvedValue(undefined);
153+
mockChannel.publish.mockRejectedValueOnce(new Error("Publish error"));
154+
await expect(
155+
rabbitMQService.sendMessage("q", "ex", "key", "msg"),
156+
).rejects.toThrowError("Could not send message to RabbitMQ queue q.");
157+
});
158+
});

src/common/rabbitmq/rabbitmq.service.ts

Lines changed: 104 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,112 +1,135 @@
1-
import amqp, { Connection, Channel } from "amqplib/callback_api";
1+
import {
2+
connect as amqplibConnect,
3+
Options,
4+
ChannelModel,
5+
Channel,
6+
} from "amqplib";
27
import {
38
Injectable,
49
Logger,
510
OnModuleDestroy,
611
OnApplicationShutdown,
12+
OnModuleInit,
713
} from "@nestjs/common";
814
import { ConfigService } from "@nestjs/config";
915

16+
function OnError(action: "throw" | "log" = "throw"): MethodDecorator {
17+
return function (
18+
target: object,
19+
propertyKey: string | symbol,
20+
descriptor: PropertyDescriptor,
21+
) {
22+
const originalMethod = descriptor.value;
23+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
24+
descriptor.value = async function (...args: any[]) {
25+
try {
26+
return await originalMethod.apply(this, args);
27+
} catch (error) {
28+
if (action === "log") {
29+
Logger.error(
30+
(this as RabbitMQService)._error +
31+
JSON.stringify((error as Error).message),
32+
"RabbitMQService",
33+
);
34+
} else {
35+
throw new Error((this as RabbitMQService)._error, { cause: error });
36+
}
37+
}
38+
};
39+
return descriptor;
40+
};
41+
}
42+
1043
/**
1144
* Service for publishing messages to a RabbitMQ queue.
1245
*/
1346
@Injectable()
14-
export class RabbitMQService implements OnModuleDestroy, OnApplicationShutdown {
15-
private connectionOptions: amqp.Options.Connect;
16-
private connection: Connection;
47+
export class RabbitMQService
48+
implements OnModuleInit, OnModuleDestroy, OnApplicationShutdown
49+
{
50+
private connection: ChannelModel;
1751
private channel: Channel;
52+
private static readonly configKeys = [
53+
"hostname",
54+
"port",
55+
"username",
56+
"password",
57+
] as const;
58+
private connectionOptions: Required<
59+
Pick<
60+
Options.Connect,
61+
(typeof RabbitMQService.configKeys)[number] | "protocol"
62+
>
63+
>;
64+
_error: string;
1865

19-
constructor(private readonly configService: ConfigService) {
20-
Logger.log("Initializing RabbitMQService.", "RabbitMQService");
21-
22-
const hostname = this.configService.get<string>("rabbitMq.hostname");
23-
const port = this.configService.get<number>("rabbitMq.port");
24-
const username = this.configService.get<string>("rabbitMq.username");
25-
const password = this.configService.get<string>("rabbitMq.password");
66+
constructor(private readonly configService: ConfigService) {}
2667

27-
if (!hostname || !port || !username || !password) {
28-
Logger.error(
29-
"RabbitMQ is enabled but missing one or more config variables.",
30-
"RabbitMQService",
68+
getValueFromConfig(key: string): string | number {
69+
const configValue = this.configService.get(`rabbitMq.${key}`);
70+
if (!configValue)
71+
throw new Error(
72+
`RabbitMQ is enabled but missing the config variable ${key}.`,
3173
);
32-
} else {
33-
this.connectionOptions = {
34-
protocol: "amqp",
35-
hostname: hostname,
36-
port: port,
37-
username: username,
38-
password: password,
39-
};
74+
return configValue;
75+
}
4076

41-
amqp.connect(
42-
this.connectionOptions,
43-
(connectionError: Error, connection: Connection) => {
44-
if (connectionError) {
45-
Logger.error(
46-
"Connection error in RabbitMQService: " +
47-
JSON.stringify(connectionError.message),
48-
"RabbitMQService",
49-
);
50-
return;
51-
}
52-
this.connection = connection;
77+
parseConfig(): void {
78+
const connectionOptions: Record<string, string | number> = {};
79+
RabbitMQService.configKeys.forEach(
80+
(configKey) =>
81+
(connectionOptions[configKey] = this.getValueFromConfig(configKey)),
82+
);
83+
connectionOptions.protocol = "amqp";
84+
this.connectionOptions = connectionOptions as typeof this.connectionOptions;
85+
}
5386

54-
this.connection.createChannel(
55-
(channelError: Error, channel: Channel) => {
56-
if (channelError) {
57-
Logger.error(
58-
"Channel error in RabbitMQService: " +
59-
JSON.stringify(channelError.message),
60-
"RabbitMQService",
61-
);
62-
return;
63-
}
64-
this.channel = channel;
65-
},
66-
);
67-
},
68-
);
69-
}
87+
async onModuleInit(): Promise<void> {
88+
Logger.log("Initializing RabbitMQService.", "RabbitMQService");
89+
this.parseConfig();
90+
await this.connect();
7091
}
7192

72-
private async connect(queue: string, exchange: string, key: string) {
73-
try {
74-
await this.channel.assertQueue(queue, { durable: true });
75-
await this.channel.assertExchange(exchange, "topic", {
76-
durable: true,
77-
});
78-
await this.channel.bindQueue(queue, exchange, key);
79-
} catch (error) {
80-
throw new Error(
81-
`Could not connect to RabbitMQ queue ${queue} with exchange ${exchange} and key ${key}.`,
82-
{ cause: error },
83-
);
84-
}
93+
@OnError("log")
94+
private async connect(): Promise<void> {
95+
this._error = "Cannot connect to RabbitMQ: ";
96+
this.connection = await amqplibConnect(this.connectionOptions);
97+
this._error = "Channel error in RabbitMQService: ";
98+
this.channel = await this.connection.createChannel();
8599
}
86100

87-
async sendMessage(queue: string, exchange: string, key: string, message: string) {
88-
try {
89-
await this.connect(queue, exchange, key);
90-
await this.channel.publish(exchange, key, Buffer.from(message), {
91-
persistent: true,
92-
});
93-
} catch (error) {
94-
throw new Error(`Could not send message to RabbitMQ queue ${queue}.`, {
95-
cause: error,
96-
});
97-
}
101+
@OnError()
102+
private async bindQueue(queue: string, exchange: string, key: string) {
103+
this._error = `Could not connect to RabbitMQ queue ${queue} with exchange ${exchange} and key ${key}.`;
104+
await this.channel.assertQueue(queue, { durable: true });
105+
await this.channel.assertExchange(exchange, "topic", {
106+
durable: true,
107+
});
108+
await this.channel.bindQueue(queue, exchange, key);
109+
}
110+
111+
@OnError()
112+
async sendMessage(
113+
queue: string,
114+
exchange: string,
115+
key: string,
116+
message: string,
117+
) {
118+
this._error = `Could not send message to RabbitMQ queue ${queue}.`;
119+
await this.bindQueue(queue, exchange, key);
120+
this.channel.publish(exchange, key, Buffer.from(message), {
121+
persistent: true,
122+
});
98123
}
99124

100125
async close(): Promise<void> {
101126
if (this.channel) {
102-
await this.channel.close(() => {
103-
Logger.log("RabbitMQ channel closed.", "RabbitMQService");
104-
});
127+
await this.channel.close();
128+
Logger.log("RabbitMQ channel closed.", "RabbitMQService");
105129
}
106130
if (this.connection) {
107-
await this.connection.close(() => {
108-
Logger.log("RabbitMQ connection closed.", "RabbitMQService");
109-
});
131+
await this.connection.close();
132+
Logger.log("RabbitMQ connection closed.", "RabbitMQService");
110133
}
111134
}
112135

0 commit comments

Comments
 (0)