Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
r4zendev committed Jan 27, 2025
1 parent 63a3a44 commit bba7beb
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { IWebhookEntityEventData } from './types';
import { Webhook } from '@/events/get-webhooks';
import { AnyRecord } from '@ballerine/common';
import { env } from '@/env';
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service';
import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service';
import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service';

@common.Injectable()
Expand Down
84 changes: 34 additions & 50 deletions services/workflows-service/src/bull-mq/bull-mq.module.ts
Original file line number Diff line number Diff line change
@@ -1,68 +1,52 @@
import { BullModule, RegisterQueueOptions } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { BullBoardModule } from '@bull-board/nestjs';
import { ExpressAdapter } from '@bull-board/express';
import { ConfigModule, ConfigService } from '@nestjs/config';

import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { AppLoggerModule } from '@/common/app-logger/app-logger.module';
import { ExpressAdapter } from '@bull-board/express';
import { BullBoardModule } from '@bull-board/nestjs';

import { QUEUES } from '@/bull-mq/consts';
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service';
import { REDIS_CONFIG } from '@/redis/const/redis-config';
import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service';
import { AppLoggerModule } from '@/common/app-logger/app-logger.module';
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module';

// eslint-disable-next-line prefer-arrow/prefer-arrow-functions
function composeQueueAndDlqBoard(queue: (typeof QUEUES)[keyof typeof QUEUES]) {
const baseFeature = BullBoardModule.forFeature({
name: queue.name,
adapter: BullMQAdapter,
});

const dlqFeature =
'dlq' in queue
? BullBoardModule.forFeature({
name: `${queue.name}-dlq`,
adapter: BullMQAdapter,
})
: null;

return dlqFeature ? [baseFeature, dlqFeature] : [baseFeature];
}

const composeInitiateQueueWithDlq = (queue: (typeof QUEUES)[keyof typeof QUEUES]) =>
[
{
name: queue.name,
...queue.config,
},
'dlq' in queue && {
name: queue.dlq,
},
].filter(Boolean);

@Module({
imports: [
AppLoggerModule,
OutgoingWebhooksModule,
// Register bull & init redis connection
BullModule.forRootAsync({
useFactory: () => {
return {
connection: {
...REDIS_CONFIG,
},
};
},
}),
BullModule.registerQueue(
...Object.values(QUEUES).flatMap(queue => {
return composeInitiateQueueWithDlq(queue);
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
connection: {
host: configService.get('REDIS_HOST'),
port: configService.get('REDIS_PORT'),
password: configService.get('REDIS_PASSWORD'),
},
}),
),
inject: [ConfigService],
}),
// Register bull board module at /api/queues
BullBoardModule.forRoot({
route: '/queues',
adapter: ExpressAdapter,
}),
...Object.values(QUEUES)
.map(queue => composeQueueAndDlqBoard(queue))
.flat(),
// Register queues and pass config to bull board forFeature
...Object.values(QUEUES).flatMap(queue => {
const queues: Array<Omit<RegisterQueueOptions, 'name'> & { name: string }> = [
{ name: queue.name, ...queue.config },
];

if ('dlq' in queue) {
queues.push({ name: queue.dlq });
}

return queues.flatMap(queue => [
BullModule.registerQueue(queue),
BullBoardModule.forFeature({ name: queue.name, adapter: BullMQAdapter }),
]);
}),
],
providers: [OutgoingWebhookQueueService],
exports: [BullModule, OutgoingWebhookQueueService],
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { WorkerListener } from 'bullmq/dist/esm/classes/worker';
import { TJobPayloadMetadata } from '@/bull-mq/types';

@Injectable()
export abstract class BaseQueueWorkerService<T = any> implements OnModuleDestroy, OnModuleInit {
export abstract class BaseQueueWorkerService<T = unknown> implements OnModuleDestroy, OnModuleInit {
protected queue?: Queue;
protected worker?: Worker;
protected connectionOptions: ConnectionOptions;
Expand All @@ -34,9 +34,7 @@ export abstract class BaseQueueWorkerService<T = any> implements OnModuleDestroy
const queueConfig = currentQueue[1];
this.queue = new Queue(queueName, {
connection: this.connectionOptions,
defaultJobOptions: {
...queueConfig.config,
},
defaultJobOptions: queueConfig.config,
});

this.deadLetterQueue =
Expand Down Expand Up @@ -122,14 +120,14 @@ export abstract class BaseQueueWorkerService<T = any> implements OnModuleDestroy
worker?.on(eventName, listener);
}

protected setQueueListener<T extends keyof QueueListener<any, any, any>>({
protected setQueueListener<T extends keyof QueueListener<unknown, unknown, string>>({
queue,
eventName,
listener,
}: {
queue: Queue | undefined;
eventName: T;
listener: QueueListener<any, any, any>[T];
listener: QueueListener<unknown, unknown, string>[T];
}) {
queue?.removeAllListeners(eventName);
queue?.on(eventName, listener);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { Injectable } from '@nestjs/common';
import { BaseQueueWorkerService } from '@/bull-mq/base/base-queue-worker.service';
import { IncomingWebhookData } from '@/bull-mq/incoming-webhook/types/types';
import { BaseQueueWorkerService } from '@/bull-mq/queues/base-queue-worker.service';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { QUEUES } from '@/bull-mq/consts';
import { Job } from 'bullmq';
import { TJobPayloadMetadata } from '@/bull-mq/types';

type TJobsWebhookIncoming = { jobData: IncomingWebhookData; metadata: TJobPayloadMetadata };
interface IncomingWebhookData {
source: string;
payload: Record<string, unknown>;
service: (payload: Record<string, unknown>) => Promise<void>;
}

@Injectable()
export class IncomingWebhookQueueService extends BaseQueueWorkerService<IncomingWebhookData> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Injectable } from '@nestjs/common';
import { Job } from 'bullmq';

import { QUEUES } from '@/bull-mq/consts';
import { BaseQueueWorkerService } from '@/bull-mq/queues/base-queue-worker.service';
import { TJobPayloadMetadata } from '@/bull-mq/types';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service';

type WebhookJobData = Parameters<OutgoingWebhooksService['invokeWebhook']>[0];
type TJobArgs = { jobData: WebhookJobData; metadata: TJobPayloadMetadata };

@Injectable()
export class OutgoingWebhookQueueService extends BaseQueueWorkerService<WebhookJobData> {
constructor(
protected readonly logger: AppLoggerService,
protected outgoingWebhookService: OutgoingWebhooksService,
) {
super(QUEUES.OUTGOING_WEBHOOKS_QUEUE.name, logger);
this.initializeWorker();
}

async handleJob(job: Job<TJobArgs>) {
await this.outgoingWebhookService.invokeWebhook(job.data.jobData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { ConfigService } from '@nestjs/config';
import type { TAuthenticationConfiguration } from '@/customer/types';
import { CustomerService } from '@/customer/customer.service';
import { env } from '@/env';
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service';
import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service';
import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service';

const getExtensionFromMimeType = (mimeType: string) => {
Expand All @@ -38,7 +38,7 @@ export class DocumentChangedWebhookCaller {
private readonly logger: AppLoggerService,
private readonly customerService: CustomerService,
private readonly outgoingWebhookQueueService: OutgoingWebhookQueueService,
private readonly outgoingWebhooksService: OutgoingWebhooksService,
private readonly outgoingWebhookService: OutgoingWebhooksService,
) {
this.#__axios = this.httpService.axiosRef;

Expand Down Expand Up @@ -217,7 +217,7 @@ export class DocumentChangedWebhookCaller {
}

try {
const res = await this.outgoingWebhooksService.invokeWebhook(webhookArgs);
const res = await this.outgoingWebhookService.invokeWebhook(webhookArgs);
this.logger.log('Webhook Result:', {
status: res.status,
statusText: res.statusText,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import type { TAuthenticationConfiguration } from '@/customer/types';
import { CustomerService } from '@/customer/customer.service';
import { WorkflowRuntimeDataRepository } from '@/workflow/workflow-runtime-data.repository';
import { env } from '@/env';
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service';
import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service';

@Injectable()
export class WorkflowCompletedWebhookCaller {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { getWebhooks, Webhook } from '@/events/get-webhooks';
import { CustomerService } from '@/customer/customer.service';
import type { TAuthenticationConfiguration } from '@/customer/types';
import { env } from '@/env';
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service';
import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service';
import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service';

@Injectable()
Expand All @@ -24,7 +24,7 @@ export class WorkflowStateChangedWebhookCaller {
private readonly logger: AppLoggerService,
private readonly customerService: CustomerService,
private readonly outgoingWebhookQueueService: OutgoingWebhookQueueService,
private readonly outgoingWebhooksService: OutgoingWebhooksService,
private readonly outgoingWebhookService: OutgoingWebhooksService,
) {
this.#__axios = this.httpService.axiosRef;

Expand Down Expand Up @@ -109,7 +109,7 @@ export class WorkflowStateChangedWebhookCaller {
}

try {
const res = await this.outgoingWebhooksService.invokeWebhook(webhookArgs);
const res = await this.outgoingWebhookService.invokeWebhook(webhookArgs);

this.logger.log('Webhook Result:', {
status: res.status,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import { AnyRecord, sign } from '@ballerine/common';
import { Injectable } from '@nestjs/common';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import axios, { Method, RawAxiosRequestHeaders } from 'axios';
import { AnyRecord, isErrorWithMessage, sign } from '@ballerine/common';

import { AppLoggerService } from '@/common/app-logger/app-logger.service';

type WebhookInvocationConfig = {
url: string;
method: Method;
headers?: Partial<RawAxiosRequestHeaders>;
body?: AnyRecord | string;
timeout?: number;
secret?: string;
};

@Injectable()
export class OutgoingWebhooksService {
Expand All @@ -14,14 +24,7 @@ export class OutgoingWebhooksService {
body,
timeout,
secret,
}: {
url: string;
method: Method;
headers?: Partial<RawAxiosRequestHeaders>;
body?: AnyRecord | string;
timeout?: number;
secret?: string;
}) {
}: WebhookInvocationConfig) {
const headers: RawAxiosRequestHeaders = {
Accept: 'application/json',
'Content-Type': 'application/json',
Expand Down

0 comments on commit bba7beb

Please sign in to comment.