diff --git a/docker-compose.yml b/docker-compose.yml index a69fd2dd..ac6dabec 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -100,5 +100,21 @@ services: DATABASE_URL: postgres://postgres:password@postgres:5432/mydb?schema=public command: ['node', 'dist/src/apps/consumers/message-consumers/main.js'] + moderator: + build: + context: . + volumes: + - .:/app + platform: linux/amd64 + depends_on: + postgres: + condition: service_healthy + rabbitmq: + condition: service_started + environment: + RABBITMQ_URL: amqp://user:password@rabbitmq:5672 + DATABASE_URL: postgres://postgres:password@postgres:5432/mydb?schema=public + command: ['node', 'dist/src/apps/moderator/main.js'] + volumes: postgres_data: diff --git a/src/apps/moderator/main.ts b/src/apps/moderator/main.ts new file mode 100644 index 00000000..e3ac3f7e --- /dev/null +++ b/src/apps/moderator/main.ts @@ -0,0 +1,27 @@ +import { MicroserviceOptions, Transport } from '@nestjs/microservices'; +import { ModeratorModule } from './moderator.module'; +import { NestFactory } from '@nestjs/core'; + +async function bootstrap() { + const app = await NestFactory.createMicroservice( + ModeratorModule, + { + transport: Transport.RMQ, + options: { + urls: [process.env.RABBITMQ_URL], + queue: 'moderator', + queueOptions: { + durable: true, + messageTtl: 60000, + deadLetterExchange: 'message', + deadLetterRoutingKey: 'dead', + }, + noAck: false, + prefetchCount: 1, + }, + }, + ); + await app.listen(); +} + +bootstrap(); diff --git a/src/apps/moderator/moderator.controller.spec.ts b/src/apps/moderator/moderator.controller.spec.ts new file mode 100644 index 00000000..81472b03 --- /dev/null +++ b/src/apps/moderator/moderator.controller.spec.ts @@ -0,0 +1,20 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ModeratorController } from './moderator.controller'; +import { ModeratorService } from './moderator.service'; + +describe('ModeratorController', () => { + let controller: ModeratorController; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + controllers: [ModeratorController], + providers: [ModeratorService], + }).compile(); + + controller = module.get(ModeratorController); + }); + + it('should be defined', () => { + expect(controller).toBeDefined(); + }); +}); diff --git a/src/apps/moderator/moderator.controller.ts b/src/apps/moderator/moderator.controller.ts new file mode 100644 index 00000000..4ec4546b --- /dev/null +++ b/src/apps/moderator/moderator.controller.ts @@ -0,0 +1,13 @@ +import { Controller } from '@nestjs/common'; +import { ModeratorService } from './moderator.service'; +import { Ctx, RmqContext, Payload, EventPattern } from '@nestjs/microservices'; + +@Controller() +export class ModeratorController { + constructor(private readonly moderatorService: ModeratorService) {} + + @EventPattern() + async handleMessage(@Payload() data: any, @Ctx() context: RmqContext) { + return await this.moderatorService.handleMessage(data, context); + } +} diff --git a/src/apps/moderator/moderator.module.ts b/src/apps/moderator/moderator.module.ts new file mode 100644 index 00000000..7bbd5c44 --- /dev/null +++ b/src/apps/moderator/moderator.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; +import { ModeratorService } from './moderator.service'; +import { ModeratorController } from './moderator.controller'; + +@Module({ + controllers: [ModeratorController], + providers: [ModeratorService], +}) +export class ModeratorModule {} diff --git a/src/apps/moderator/moderator.service.spec.ts b/src/apps/moderator/moderator.service.spec.ts new file mode 100644 index 00000000..f2fc54de --- /dev/null +++ b/src/apps/moderator/moderator.service.spec.ts @@ -0,0 +1,18 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ModeratorService } from './moderator.service'; + +describe('ModeratorService', () => { + let service: ModeratorService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ModeratorService], + }).compile(); + + service = module.get(ModeratorService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/src/apps/moderator/moderator.service.ts b/src/apps/moderator/moderator.service.ts new file mode 100644 index 00000000..b2cf6582 --- /dev/null +++ b/src/apps/moderator/moderator.service.ts @@ -0,0 +1,9 @@ +import { Injectable } from '@nestjs/common'; +import { RmqContext } from '@nestjs/microservices'; + +@Injectable() +export class ModeratorService { + async handleMessage(message: any, context: RmqContext) { + console.log({ message: message, context: context }); + } +} diff --git a/src/common/rabbitmq/message-exchange/message-exchange-queues.service.ts b/src/common/rabbitmq/message-exchange/message-exchange-queues.service.ts index 110b27a4..04ebe767 100644 --- a/src/common/rabbitmq/message-exchange/message-exchange-queues.service.ts +++ b/src/common/rabbitmq/message-exchange/message-exchange-queues.service.ts @@ -4,15 +4,25 @@ import * as amqp from 'amqplib'; @Injectable() export class MessageExchangeQueuesService { private readonly QUEUE_NAMES = ['message', 'database']; - private readonly ROUTING_KEY = 'message'; + private readonly PRE_CONV_QUEUE_NAMES = ['moderator', 'database']; private channel: amqp.Channel; async init(channel: amqp.Channel, exchangeName: string) { this.channel = channel; - await this.createQueue(exchangeName); + await this.createQueue(exchangeName, this.QUEUE_NAMES, 'message'); + await this.createQueue( + exchangeName, + this.PRE_CONV_QUEUE_NAMES, + 'moderator', + ); } - async createQueue(exchangeName: string) { - for (const queueName of this.QUEUE_NAMES) { + + async createQueue( + exchangeName: string, + queueNames: string[], + routingKey: string, + ) { + for (const queueName of queueNames) { try { await this.channel.assertQueue(queueName, { durable: true, @@ -20,7 +30,7 @@ export class MessageExchangeQueuesService { deadLetterExchange: 'message', deadLetterRoutingKey: 'dead', }); - await this.channel.bindQueue(queueName, exchangeName, this.ROUTING_KEY); + await this.channel.bindQueue(queueName, exchangeName, routingKey); } catch (error) { console.error(error); } diff --git a/src/modules/message/platform/telegram-message.ts b/src/modules/message/platform/telegram-message.ts index 34b3c4f6..a49b0f88 100644 --- a/src/modules/message/platform/telegram-message.ts +++ b/src/modules/message/platform/telegram-message.ts @@ -17,7 +17,11 @@ export class TelegramMessageStrategy implements MessageStrategy { const data = JSON.stringify( await this.rabbitmqService.getMessageEchangeData(formattedMessage), ); - this.messageExchangeService.send('message', data); + if (!formattedMessage.conversationId) { + await this.messageExchangeService.send('moderator', data); + } else if (formattedMessage.conversationId) { + await this.messageExchangeService.send('message', data); + } return 'ok'; }