diff --git a/services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts b/services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts index d93ccef046..eef1f961e2 100644 --- a/services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts +++ b/services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts @@ -8,7 +8,7 @@ import { IWebhookEntityEventData } from './types'; import { Webhook } from '@/events/get-webhooks'; import { AnyRecord } from '@ballerine/common'; import { env } from '@/env'; -import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service'; +import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service'; import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service'; @common.Injectable() diff --git a/services/workflows-service/src/bull-mq/bull-mq.module.ts b/services/workflows-service/src/bull-mq/bull-mq.module.ts index c3c125077f..0c2e09523e 100644 --- a/services/workflows-service/src/bull-mq/bull-mq.module.ts +++ b/services/workflows-service/src/bull-mq/bull-mq.module.ts @@ -1,68 +1,52 @@ +import { BullModule, RegisterQueueOptions } from '@nestjs/bullmq'; import { Module } from '@nestjs/common'; -import { BullModule } from '@nestjs/bullmq'; -import { BullBoardModule } from '@bull-board/nestjs'; -import { ExpressAdapter } from '@bull-board/express'; +import { ConfigModule, ConfigService } from '@nestjs/config'; + import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; -import { AppLoggerModule } from '@/common/app-logger/app-logger.module'; +import { ExpressAdapter } from '@bull-board/express'; +import { BullBoardModule } from '@bull-board/nestjs'; + import { QUEUES } from '@/bull-mq/consts'; -import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service'; -import { REDIS_CONFIG } from '@/redis/const/redis-config'; +import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service'; +import { AppLoggerModule } from '@/common/app-logger/app-logger.module'; import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module'; -// eslint-disable-next-line prefer-arrow/prefer-arrow-functions -function composeQueueAndDlqBoard(queue: (typeof QUEUES)[keyof typeof QUEUES]) { - const baseFeature = BullBoardModule.forFeature({ - name: queue.name, - adapter: BullMQAdapter, - }); - - const dlqFeature = - 'dlq' in queue - ? BullBoardModule.forFeature({ - name: `${queue.name}-dlq`, - adapter: BullMQAdapter, - }) - : null; - - return dlqFeature ? [baseFeature, dlqFeature] : [baseFeature]; -} - -const composeInitiateQueueWithDlq = (queue: (typeof QUEUES)[keyof typeof QUEUES]) => - [ - { - name: queue.name, - ...queue.config, - }, - 'dlq' in queue && { - name: queue.dlq, - }, - ].filter(Boolean); - @Module({ imports: [ AppLoggerModule, OutgoingWebhooksModule, + // Register bull & init redis connection BullModule.forRootAsync({ - useFactory: () => { - return { - connection: { - ...REDIS_CONFIG, - }, - }; - }, - }), - BullModule.registerQueue( - ...Object.values(QUEUES).flatMap(queue => { - return composeInitiateQueueWithDlq(queue); + imports: [ConfigModule], + useFactory: async (configService: ConfigService) => ({ + connection: { + host: configService.get('REDIS_HOST'), + port: configService.get('REDIS_PORT'), + password: configService.get('REDIS_PASSWORD'), + }, }), - ), + inject: [ConfigService], + }), + // Register bull board module at /api/queues BullBoardModule.forRoot({ route: '/queues', adapter: ExpressAdapter, }), - ...Object.values(QUEUES) - .map(queue => composeQueueAndDlqBoard(queue)) - .flat(), + // Register queues and pass config to bull board forFeature + ...Object.values(QUEUES).flatMap(queue => { + const queues: Array & { name: string }> = [ + { name: queue.name, ...queue.config }, + ]; + + if ('dlq' in queue) { + queues.push({ name: queue.dlq }); + } + + return queues.flatMap(queue => [ + BullModule.registerQueue(queue), + BullBoardModule.forFeature({ name: queue.name, adapter: BullMQAdapter }), + ]); + }), ], providers: [OutgoingWebhookQueueService], exports: [BullModule, OutgoingWebhookQueueService], diff --git a/services/workflows-service/src/bull-mq/incoming-webhook/types/types.ts b/services/workflows-service/src/bull-mq/incoming-webhook/types/types.ts deleted file mode 100644 index 610be28e11..0000000000 --- a/services/workflows-service/src/bull-mq/incoming-webhook/types/types.ts +++ /dev/null @@ -1,5 +0,0 @@ -export interface IncomingWebhookData { - source: string; - payload: Record; - service: (payload: Record) => Promise; -} diff --git a/services/workflows-service/src/bull-mq/outgoing-webhook/outgoing-webhook-queue.service.ts b/services/workflows-service/src/bull-mq/outgoing-webhook/outgoing-webhook-queue.service.ts deleted file mode 100644 index 4e06abacff..0000000000 --- a/services/workflows-service/src/bull-mq/outgoing-webhook/outgoing-webhook-queue.service.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { BaseQueueWorkerService } from '@/bull-mq/base/base-queue-worker.service'; -import { AppLoggerService } from '@/common/app-logger/app-logger.service'; -import { QUEUES } from '@/bull-mq/consts'; -import { Job } from 'bullmq'; -import { HttpStatusCode } from 'axios'; -import { WebhookJobData } from '@/bull-mq/outgoing-webhook/types/types'; -import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service'; -import { TJobPayloadMetadata } from '@/bull-mq/types'; - -type TJobArgs = { jobData: WebhookJobData; metadata: TJobPayloadMetadata }; - -@Injectable() -export class OutgoingWebhookQueueService extends BaseQueueWorkerService { - constructor( - protected readonly logger: AppLoggerService, - protected webhookService: OutgoingWebhooksService, - ) { - super(QUEUES.OUTGOING_WEBHOOKS_QUEUE.name, logger); - this.initializeWorker(); - } - - async handleJob(job: Job) { - const response = await this.webhookService.invokeWebhook({ - ...job.data.jobData, - }); - - // if (response.status >= 200 && response.status < 300) { - // return; - // } - - // await this.handleRetryStrategy(response.status, job); - } - - // private async handleRetryStrategy(status: number, job: Job) { - // if (job.opts.attempts && job.attemptsMade >= job.opts.attempts) { - // this.logger.warn(`Job ${job.id} reached the maximum retry attempts (${job.opts.attempts})`); - // throw new Error(`Job ${job.id} failed after reaching max attempts`); - // } - - // let delayMs: number; - - // switch (status) { - // case HttpStatusCode.TooManyRequests: - // case HttpStatusCode.InternalServerError: - // case HttpStatusCode.BadGateway: - // case HttpStatusCode.ServiceUnavailable: - // case HttpStatusCode.GatewayTimeout: - // delayMs = Math.pow(2, job.attemptsMade + 1) * 1000; // Exponential backoff - // break; - - // case HttpStatusCode.RequestTimeout: - // delayMs = 1000 * 60 * (job.attemptsMade + 1); // Linear backoff in minutes - // break; - - // case HttpStatusCode.BadRequest: - // throw new Error(`Webhook job failed with status ${status}: Bad Request`); - - // default: - // throw new Error(`Webhook job failed with status ${status}: Unexpected Error`); - // } - - // await this.retryJob(job, delayMs); - // } - - // private async retryJob(job: Job, delayMs: number) { - // const nextAttempt = job.attemptsMade + 1; - // this.logger.log( - // `Scheduling retry for job ${job.id}. Next attempt: ${nextAttempt}, delay: ${delayMs}ms`, - // ); - - // await job.updateProgress(nextAttempt); - // await job.moveToDelayed(Date.now() + delayMs); - // } -} diff --git a/services/workflows-service/src/bull-mq/outgoing-webhook/types/types.ts b/services/workflows-service/src/bull-mq/outgoing-webhook/types/types.ts deleted file mode 100644 index d1ed90a231..0000000000 --- a/services/workflows-service/src/bull-mq/outgoing-webhook/types/types.ts +++ /dev/null @@ -1,3 +0,0 @@ -import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service'; - -export type WebhookJobData = Parameters[0]; diff --git a/services/workflows-service/src/bull-mq/base/base-queue-worker.service.ts b/services/workflows-service/src/bull-mq/queues/base-queue-worker.service.ts similarity index 93% rename from services/workflows-service/src/bull-mq/base/base-queue-worker.service.ts rename to services/workflows-service/src/bull-mq/queues/base-queue-worker.service.ts index ad7bcbc277..06a366de9a 100644 --- a/services/workflows-service/src/bull-mq/base/base-queue-worker.service.ts +++ b/services/workflows-service/src/bull-mq/queues/base-queue-worker.service.ts @@ -8,7 +8,7 @@ import { WorkerListener } from 'bullmq/dist/esm/classes/worker'; import { TJobPayloadMetadata } from '@/bull-mq/types'; @Injectable() -export abstract class BaseQueueWorkerService implements OnModuleDestroy, OnModuleInit { +export abstract class BaseQueueWorkerService implements OnModuleDestroy, OnModuleInit { protected queue?: Queue; protected worker?: Worker; protected connectionOptions: ConnectionOptions; @@ -34,9 +34,7 @@ export abstract class BaseQueueWorkerService implements OnModuleDestroy const queueConfig = currentQueue[1]; this.queue = new Queue(queueName, { connection: this.connectionOptions, - defaultJobOptions: { - ...queueConfig.config, - }, + defaultJobOptions: queueConfig.config, }); this.deadLetterQueue = @@ -122,14 +120,14 @@ export abstract class BaseQueueWorkerService implements OnModuleDestroy worker?.on(eventName, listener); } - protected setQueueListener>({ + protected setQueueListener>({ queue, eventName, listener, }: { queue: Queue | undefined; eventName: T; - listener: QueueListener[T]; + listener: QueueListener[T]; }) { queue?.removeAllListeners(eventName); queue?.on(eventName, listener); diff --git a/services/workflows-service/src/bull-mq/incoming-webhook/incoming-webhook-queue.service.ts b/services/workflows-service/src/bull-mq/queues/incoming-webhook-queue.service.ts similarity index 77% rename from services/workflows-service/src/bull-mq/incoming-webhook/incoming-webhook-queue.service.ts rename to services/workflows-service/src/bull-mq/queues/incoming-webhook-queue.service.ts index 8a138263b1..8eb15eed71 100644 --- a/services/workflows-service/src/bull-mq/incoming-webhook/incoming-webhook-queue.service.ts +++ b/services/workflows-service/src/bull-mq/queues/incoming-webhook-queue.service.ts @@ -1,12 +1,16 @@ import { Injectable } from '@nestjs/common'; -import { BaseQueueWorkerService } from '@/bull-mq/base/base-queue-worker.service'; -import { IncomingWebhookData } from '@/bull-mq/incoming-webhook/types/types'; +import { BaseQueueWorkerService } from '@/bull-mq/queues/base-queue-worker.service'; import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import { QUEUES } from '@/bull-mq/consts'; import { Job } from 'bullmq'; import { TJobPayloadMetadata } from '@/bull-mq/types'; type TJobsWebhookIncoming = { jobData: IncomingWebhookData; metadata: TJobPayloadMetadata }; +interface IncomingWebhookData { + source: string; + payload: Record; + service: (payload: Record) => Promise; +} @Injectable() export class IncomingWebhookQueueService extends BaseQueueWorkerService { diff --git a/services/workflows-service/src/bull-mq/queues/outgoing-webhook-queue.service.ts b/services/workflows-service/src/bull-mq/queues/outgoing-webhook-queue.service.ts new file mode 100644 index 0000000000..fc040723b8 --- /dev/null +++ b/services/workflows-service/src/bull-mq/queues/outgoing-webhook-queue.service.ts @@ -0,0 +1,26 @@ +import { Injectable } from '@nestjs/common'; +import { Job } from 'bullmq'; + +import { QUEUES } from '@/bull-mq/consts'; +import { BaseQueueWorkerService } from '@/bull-mq/queues/base-queue-worker.service'; +import { TJobPayloadMetadata } from '@/bull-mq/types'; +import { AppLoggerService } from '@/common/app-logger/app-logger.service'; +import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service'; + +type WebhookJobData = Parameters[0]; +type TJobArgs = { jobData: WebhookJobData; metadata: TJobPayloadMetadata }; + +@Injectable() +export class OutgoingWebhookQueueService extends BaseQueueWorkerService { + constructor( + protected readonly logger: AppLoggerService, + protected outgoingWebhookService: OutgoingWebhooksService, + ) { + super(QUEUES.OUTGOING_WEBHOOKS_QUEUE.name, logger); + this.initializeWorker(); + } + + async handleJob(job: Job) { + await this.outgoingWebhookService.invokeWebhook(job.data.jobData); + } +} diff --git a/services/workflows-service/src/events/document-changed-webhook-caller.ts b/services/workflows-service/src/events/document-changed-webhook-caller.ts index 916469899d..e0697a1f87 100644 --- a/services/workflows-service/src/events/document-changed-webhook-caller.ts +++ b/services/workflows-service/src/events/document-changed-webhook-caller.ts @@ -14,7 +14,7 @@ import { ConfigService } from '@nestjs/config'; import type { TAuthenticationConfiguration } from '@/customer/types'; import { CustomerService } from '@/customer/customer.service'; import { env } from '@/env'; -import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service'; +import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service'; import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service'; const getExtensionFromMimeType = (mimeType: string) => { @@ -38,7 +38,7 @@ export class DocumentChangedWebhookCaller { private readonly logger: AppLoggerService, private readonly customerService: CustomerService, private readonly outgoingWebhookQueueService: OutgoingWebhookQueueService, - private readonly outgoingWebhooksService: OutgoingWebhooksService, + private readonly outgoingWebhookService: OutgoingWebhooksService, ) { this.#__axios = this.httpService.axiosRef; @@ -217,7 +217,7 @@ export class DocumentChangedWebhookCaller { } try { - const res = await this.outgoingWebhooksService.invokeWebhook(webhookArgs); + const res = await this.outgoingWebhookService.invokeWebhook(webhookArgs); this.logger.log('Webhook Result:', { status: res.status, statusText: res.statusText, diff --git a/services/workflows-service/src/events/workflow-completed-webhook-caller.ts b/services/workflows-service/src/events/workflow-completed-webhook-caller.ts index 7ae62a8b83..4a3291967f 100644 --- a/services/workflows-service/src/events/workflow-completed-webhook-caller.ts +++ b/services/workflows-service/src/events/workflow-completed-webhook-caller.ts @@ -14,7 +14,7 @@ import type { TAuthenticationConfiguration } from '@/customer/types'; import { CustomerService } from '@/customer/customer.service'; import { WorkflowRuntimeDataRepository } from '@/workflow/workflow-runtime-data.repository'; import { env } from '@/env'; -import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service'; +import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service'; @Injectable() export class WorkflowCompletedWebhookCaller { diff --git a/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts b/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts index fede9182dd..57d0214502 100644 --- a/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts +++ b/services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts @@ -10,7 +10,7 @@ import { getWebhooks, Webhook } from '@/events/get-webhooks'; import { CustomerService } from '@/customer/customer.service'; import type { TAuthenticationConfiguration } from '@/customer/types'; import { env } from '@/env'; -import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service'; +import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service'; import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service'; @Injectable() @@ -24,7 +24,7 @@ export class WorkflowStateChangedWebhookCaller { private readonly logger: AppLoggerService, private readonly customerService: CustomerService, private readonly outgoingWebhookQueueService: OutgoingWebhookQueueService, - private readonly outgoingWebhooksService: OutgoingWebhooksService, + private readonly outgoingWebhookService: OutgoingWebhooksService, ) { this.#__axios = this.httpService.axiosRef; @@ -109,7 +109,7 @@ export class WorkflowStateChangedWebhookCaller { } try { - const res = await this.outgoingWebhooksService.invokeWebhook(webhookArgs); + const res = await this.outgoingWebhookService.invokeWebhook(webhookArgs); this.logger.log('Webhook Result:', { status: res.status, diff --git a/services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts b/services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts index fb5cf012e1..a9f622db35 100644 --- a/services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts +++ b/services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts @@ -1,7 +1,17 @@ +import { AnyRecord, sign } from '@ballerine/common'; import { Injectable } from '@nestjs/common'; -import { AppLoggerService } from '@/common/app-logger/app-logger.service'; import axios, { Method, RawAxiosRequestHeaders } from 'axios'; -import { AnyRecord, isErrorWithMessage, sign } from '@ballerine/common'; + +import { AppLoggerService } from '@/common/app-logger/app-logger.service'; + +type WebhookInvocationConfig = { + url: string; + method: Method; + headers?: Partial; + body?: AnyRecord | string; + timeout?: number; + secret?: string; +}; @Injectable() export class OutgoingWebhooksService { @@ -14,14 +24,7 @@ export class OutgoingWebhooksService { body, timeout, secret, - }: { - url: string; - method: Method; - headers?: Partial; - body?: AnyRecord | string; - timeout?: number; - secret?: string; - }) { + }: WebhookInvocationConfig) { const headers: RawAxiosRequestHeaders = { Accept: 'application/json', 'Content-Type': 'application/json',