Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
r4zendev committed Jan 28, 2025
1 parent f33927a commit 1a7acc9
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 179 deletions.
5 changes: 4 additions & 1 deletion services/workflows-service/src/alert/alert.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-we
],
controllers: [AlertControllerInternal, AlertControllerExternal],
providers: [
WebhookHttpService,
{
provide: WebhookHttpService,
useExisting: HttpService,
},
AlertService,
AlertRepository,
AlertDefinitionRepository,
Expand Down

This file was deleted.

6 changes: 4 additions & 2 deletions services/workflows-service/src/bull-mq/bull-mq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import { ExpressAdapter } from '@bull-board/express';
import { BullBoardModule } from '@bull-board/nestjs';

import { QUEUES } from '@/bull-mq/consts';
import { IncomingWebhookQueueService } from '@/bull-mq/queues/incoming-webhook-queue.service';
import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service';
import { AppLoggerModule } from '@/common/app-logger/app-logger.module';
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module';

@Module({
imports: [
AppLoggerModule,
ConfigModule,
OutgoingWebhooksModule,
// Register bull & init redis connection
BullModule.forRootAsync({
Expand Down Expand Up @@ -48,7 +50,7 @@ import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-we
]);
}),
],
providers: [OutgoingWebhookQueueService],
exports: [BullModule, OutgoingWebhookQueueService],
providers: [OutgoingWebhookQueueService, IncomingWebhookQueueService],
exports: [OutgoingWebhookQueueService, IncomingWebhookQueueService],
})
export class BullMqModule {}
Original file line number Diff line number Diff line change
@@ -1,59 +1,42 @@
import { ConnectionOptions, Job, Queue, QueueListener, Worker } from 'bullmq';
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { REDIS_CONFIG } from '@/redis/const/redis-config';
import { env } from '@/env';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { QUEUES } from '@/bull-mq/consts';
import { ConfigService } from '@nestjs/config';
import { Job, Queue, QueueListener, Worker } from 'bullmq';
import { WorkerListener } from 'bullmq/dist/esm/classes/worker';
import { randomUUID } from 'node:crypto';

import { TJobPayloadMetadata } from '@/bull-mq/types';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';

@Injectable()
export abstract class BaseQueueWorkerService<T = unknown> implements OnModuleDestroy, OnModuleInit {
protected queue?: Queue;
protected worker?: Worker;
protected connectionOptions: ConnectionOptions;
protected deadLetterQueue?: Queue | undefined;

protected constructor(protected queueName: string, protected readonly logger: AppLoggerService) {
this.connectionOptions = {
...REDIS_CONFIG,
};

if (!env.QUEUE_SYSTEM_ENABLED) {
protected constructor(
protected readonly queue: Queue,
protected readonly deadLetterQueue: Queue,
protected readonly logger: AppLoggerService,
protected readonly configService: ConfigService,
) {
if (!this.configService.get('QUEUE_SYSTEM_ENABLED')) {
return;
}

const currentQueue = Object.entries(QUEUES).find(
([_, queueOptions]) => queueOptions.name === queueName,
);

if (!currentQueue) {
throw new Error(`Queue configuration of ${queueName} not found in QUEUES`);
}

const queueConfig = currentQueue[1];
this.queue = new Queue(queueName, {
connection: this.connectionOptions,
defaultJobOptions: queueConfig.config,
});

this.deadLetterQueue =
'dlq' in queueConfig
? new Queue(queueConfig.dlq, { connection: this.connectionOptions })
: undefined;

this.initializeWorker();
}

abstract handleJob(job: Job<{ jobData: T; metadata: TJobPayloadMetadata }>): Promise<void>;

async addJob(jobData: T, metadata: TJobPayloadMetadata = {}, jobOptions = {}): Promise<void> {
await this.queue?.add(this.queueName, { metadata, jobData }, jobOptions);
await this.queue?.add(randomUUID(), { metadata, jobData }, jobOptions);
}

protected initializeWorker() {
this.worker = new Worker(this.queueName, this.handleJob.bind(this), {
connection: this.connectionOptions,
this.worker = new Worker(this.queue.name, this.handleJob.bind(this), {
connection: {
host: this.configService.get('REDIS_HOST'),
port: this.configService.get('REDIS_PORT'),
password: this.configService.get('REDIS_PASSWORD'),
},
});

this.addWorkerListeners();
Expand All @@ -72,27 +55,11 @@ export abstract class BaseQueueWorkerService<T = unknown> implements OnModuleDes
this.setWorkerListener({
worker: this.worker,
eventName: 'failed',
listener: async (job, error, prev) => {
const queueConfig =
Object.entries(QUEUES).find(
([_, queueOptions]) => queueOptions.name === this.queueName,
)?.[1]?.config || QUEUES.DEFAULT.config;

const maxAllowedRetries = queueConfig.attempts;
const currentAttempts = job?.attemptsMade ?? 0;

if (currentAttempts >= maxAllowedRetries) {
if (this.deadLetterQueue) {
this.logger.error(`Job ${job?.id} failed permanently. Moving to DLQ.`);
await this.deadLetterQueue.add(`${this.queueName}-dlq`, job?.data);
}

this.logger.error(`Job ${job?.id} failed permanently. Max attempts reached.`);
} else {
this.logger.warn(
`Webhook job ${job?.id} failed. Attempt: ${currentAttempts}. Error: ${error.message}`,
);
}
listener: async job => {
if (!job?.opts.attempts || job.attemptsMade < job.opts.attempts) return;

this.logger.error(`Job ${job?.id} failed permanently. Moving to DLQ.`);
await this.deadLetterQueue.add(randomUUID(), job?.data);
},
});
}
Expand Down Expand Up @@ -137,7 +104,7 @@ export abstract class BaseQueueWorkerService<T = unknown> implements OnModuleDes
await this.queue?.pause();
await Promise.all([this.worker?.close(), this.queue?.close()]);

this.logger.log(`Queue ${this.queueName} is paused and closed`);
this.logger.log(`Queue ${this.queue.name} is paused and closed`);
}

async onModuleInit() {
Expand All @@ -151,7 +118,7 @@ export abstract class BaseQueueWorkerService<T = unknown> implements OnModuleDes
const isPausedAfterResume = await this.queue?.isPaused();

if (isPausedAfterResume) {
this.logger.error(`Queue ${this.queueName} is still paused after trying to resume it`);
this.logger.error(`Queue ${this.queue.name} is still paused after trying to resume it`);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { BaseQueueWorkerService } from '@/bull-mq/queues/base-queue-worker.service';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { ConfigService } from '@nestjs/config';
import { Job, Queue } from 'bullmq';

import { QUEUES } from '@/bull-mq/consts';
import { Job } from 'bullmq';
import { BaseQueueWorkerService } from '@/bull-mq/queues/base-queue-worker.service';
import { TJobPayloadMetadata } from '@/bull-mq/types';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
// import { IncomingWebhooksService } from '@/webhooks/incoming/incoming-webhooks.service';

type TJobsWebhookIncoming = { jobData: IncomingWebhookData; metadata: TJobPayloadMetadata };
interface IncomingWebhookData {
Expand All @@ -14,8 +18,15 @@ interface IncomingWebhookData {

@Injectable()
export class IncomingWebhookQueueService extends BaseQueueWorkerService<IncomingWebhookData> {
constructor(protected readonly logger: AppLoggerService) {
super(QUEUES.INCOMING_WEBHOOKS_QUEUE.name, logger);
constructor(
@InjectQueue(QUEUES.INCOMING_WEBHOOKS_QUEUE.name) incomingQueue: Queue,
@InjectQueue(QUEUES.INCOMING_WEBHOOKS_QUEUE.dlq) incomingDLQ: Queue,
// circular dep
// protected readonly incomingWebhookService: IncomingWebhooksService,
protected readonly logger: AppLoggerService,
protected readonly config: ConfigService,
) {
super(incomingQueue, incomingDLQ, logger, config);
}

async handleJob(job: Job<TJobsWebhookIncoming>) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Job } from 'bullmq';
import { ConfigService } from '@nestjs/config';
import { Job, Queue } from 'bullmq';

import { QUEUES } from '@/bull-mq/consts';
import { BaseQueueWorkerService } from '@/bull-mq/queues/base-queue-worker.service';
Expand All @@ -13,11 +15,13 @@ type TJobArgs = { jobData: WebhookJobData; metadata: TJobPayloadMetadata };
@Injectable()
export class OutgoingWebhookQueueService extends BaseQueueWorkerService<WebhookJobData> {
constructor(
@InjectQueue(QUEUES.OUTGOING_WEBHOOKS_QUEUE.name) outgoingQueue: Queue,
@InjectQueue(QUEUES.OUTGOING_WEBHOOKS_QUEUE.dlq) outgoingDLQ: Queue,
protected readonly outgoingWebhookService: OutgoingWebhooksService,
protected readonly logger: AppLoggerService,
protected outgoingWebhookService: OutgoingWebhooksService,
protected readonly config: ConfigService,
) {
super(QUEUES.OUTGOING_WEBHOOKS_QUEUE.name, logger);
this.initializeWorker();
super(outgoingQueue, outgoingDLQ, logger, config);
}

async handleJob(job: Job<TJobArgs>) {
Expand Down
4 changes: 0 additions & 4 deletions services/workflows-service/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@ export const serverEnvSchema = {
REDIS_HOST: z.string(),
REDIS_PASSWORD: z.string().optional(),
REDIS_PORT: z.string().transform(value => Number(value)),
REDIS_DB: z
.string()
.transform(value => Number(value))
.optional(),
QUEUE_SYSTEM_ENABLED: z
.string()
.transform(value => value === 'true')
Expand Down
9 changes: 0 additions & 9 deletions services/workflows-service/src/redis/const/redis-config.ts

This file was deleted.

8 changes: 0 additions & 8 deletions services/workflows-service/src/redis/redis.module.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class OutgoingWebhooksService {
headers['X-HMAC-Signature'] = sign({ payload: body, key: secret });
}

return await axios({
return axios({
url,
method,
headers,
Expand Down
60 changes: 0 additions & 60 deletions services/workflows-service/src/worker-app.module.ts

This file was deleted.

21 changes: 0 additions & 21 deletions services/workflows-service/src/worker-main.ts

This file was deleted.

0 comments on commit 1a7acc9

Please sign in to comment.