Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,21 @@ services:
DATABASE_URL: postgres://postgres:password@postgres:5432/mydb?schema=public
command: ['node', 'dist/src/apps/consumers/message-consumers/main.js']

moderator:
build:
context: .
volumes:
- .:/app
platform: linux/amd64
depends_on:
postgres:
condition: service_healthy
rabbitmq:
condition: service_started
environment:
RABBITMQ_URL: amqp://user:password@rabbitmq:5672
DATABASE_URL: postgres://postgres:password@postgres:5432/mydb?schema=public
command: ['node', 'dist/src/apps/moderator/main.js']

volumes:
postgres_data:
27 changes: 27 additions & 0 deletions src/apps/moderator/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { ModeratorModule } from './moderator.module';
import { NestFactory } from '@nestjs/core';

async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
ModeratorModule,
{
transport: Transport.RMQ,
options: {
urls: [process.env.RABBITMQ_URL],
queue: 'moderator',
queueOptions: {
durable: true,
messageTtl: 60000,
deadLetterExchange: 'message',
deadLetterRoutingKey: 'dead',
},
noAck: false,
prefetchCount: 1,
},
},
);
await app.listen();
}

bootstrap();
20 changes: 20 additions & 0 deletions src/apps/moderator/moderator.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ModeratorController } from './moderator.controller';
import { ModeratorService } from './moderator.service';

describe('ModeratorController', () => {
let controller: ModeratorController;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [ModeratorController],
providers: [ModeratorService],
}).compile();

controller = module.get<ModeratorController>(ModeratorController);
});

it('should be defined', () => {
expect(controller).toBeDefined();
});
});
13 changes: 13 additions & 0 deletions src/apps/moderator/moderator.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Controller } from '@nestjs/common';
import { ModeratorService } from './moderator.service';
import { Ctx, RmqContext, Payload, EventPattern } from '@nestjs/microservices';

@Controller()
export class ModeratorController {
constructor(private readonly moderatorService: ModeratorService) {}

@EventPattern()
async handleMessage(@Payload() data: any, @Ctx() context: RmqContext) {
return await this.moderatorService.handleMessage(data, context);
}
}
9 changes: 9 additions & 0 deletions src/apps/moderator/moderator.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { ModeratorService } from './moderator.service';
import { ModeratorController } from './moderator.controller';

@Module({
controllers: [ModeratorController],
providers: [ModeratorService],
})
export class ModeratorModule {}
18 changes: 18 additions & 0 deletions src/apps/moderator/moderator.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ModeratorService } from './moderator.service';

describe('ModeratorService', () => {
let service: ModeratorService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [ModeratorService],
}).compile();

service = module.get<ModeratorService>(ModeratorService);
});

it('should be defined', () => {
expect(service).toBeDefined();
});
});
9 changes: 9 additions & 0 deletions src/apps/moderator/moderator.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Injectable } from '@nestjs/common';
import { RmqContext } from '@nestjs/microservices';

@Injectable()
export class ModeratorService {
async handleMessage(message: any, context: RmqContext) {
console.log({ message: message, context: context });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,33 @@ import * as amqp from 'amqplib';
@Injectable()
export class MessageExchangeQueuesService {
private readonly QUEUE_NAMES = ['message', 'database'];
private readonly ROUTING_KEY = 'message';
private readonly PRE_CONV_QUEUE_NAMES = ['moderator', 'database'];
private channel: amqp.Channel;

async init(channel: amqp.Channel, exchangeName: string) {
this.channel = channel;
await this.createQueue(exchangeName);
await this.createQueue(exchangeName, this.QUEUE_NAMES, 'message');
await this.createQueue(
exchangeName,
this.PRE_CONV_QUEUE_NAMES,
'moderator',
);
}
async createQueue(exchangeName: string) {
for (const queueName of this.QUEUE_NAMES) {

async createQueue(
exchangeName: string,
queueNames: string[],
routingKey: string,
) {
for (const queueName of queueNames) {
try {
await this.channel.assertQueue(queueName, {
durable: true,
messageTtl: 60000,
deadLetterExchange: 'message',
deadLetterRoutingKey: 'dead',
});
await this.channel.bindQueue(queueName, exchangeName, this.ROUTING_KEY);
await this.channel.bindQueue(queueName, exchangeName, routingKey);
} catch (error) {
console.error(error);
}
Expand Down
6 changes: 5 additions & 1 deletion src/modules/message/platform/telegram-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ export class TelegramMessageStrategy implements MessageStrategy {
const data = JSON.stringify(
await this.rabbitmqService.getMessageEchangeData(formattedMessage),
);
this.messageExchangeService.send('message', data);
if (!formattedMessage.conversationId) {
await this.messageExchangeService.send('moderator', data);
} else if (formattedMessage.conversationId) {
await this.messageExchangeService.send('message', data);
}

return 'ok';
}
Expand Down