Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bull mq integration 2 #2983

Open
wants to merge 23 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,591 changes: 1,129 additions & 462 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions services/workflows-service/docker-compose.redis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3.8'
services:
redis:
image: redis:alpine
ports:
- '${REDIS_PORT}:6379'
volumes:
- redis-data:/data
command: >
--requirepass ${REDIS_PASSWORD}
--appendonly yes
environment:
- REDIS_PASSWORD=${REDIS_PASSWORD}
- REDIS_PORT=${REDIS_PORT}
networks:
- app-network
volumes:
redis-data:
driver: local
networks:
app-network:
driver: bridge
9 changes: 8 additions & 1 deletion services/workflows-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"description": "workflow-service",
"scripts": {
"spellcheck": "cspell \"*\"",
"setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run db:reset && npm run seed",
"setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run docker:redis:down && npm run docker:redis && npm run db:reset && npm run seed",
"format": "prettier --write . '!**/*.{md,hbs}'",
"format:check": "prettier --check . '!**/*.{md,hbs}'",
"lint": "eslint . --fix",
Expand All @@ -32,6 +32,8 @@
"db:reset:dev:with-data": "npm run db:reset:dev && npm run db:data-migration:migrate && npm run db:data-sync",
"db:init": "npm run db:migrate-dev -- --name 'initial version' && npm run db:migrate-up seed",
"prisma:generate": "prisma generate",
"docker:redis": "docker compose -f docker-compose.redis.yml up -d --wait",
"docker:redis:down": "docker compose -f docker-compose.redis.yml down --volumes",
"docker:db": "docker compose -f docker-compose.db.yml up -d --wait",
"docker:db:down": "docker compose -f docker-compose.db.yml down --volumes",
"docker:build": "docker build .",
Expand All @@ -53,8 +55,12 @@
"@ballerine/common": "0.9.70",
"@ballerine/workflow-core": "0.6.89",
"@ballerine/workflow-node-sdk": "0.6.89",
"@bull-board/api": "^6.7.1",
"@bull-board/express": "^6.7.1",
"@bull-board/nestjs": "^6.7.1",
"@faker-js/faker": "^7.6.0",
"@nestjs/axios": "^2.0.0",
"@nestjs/bullmq": "^10.2.1",
"@nestjs/common": "^9.3.12",
"@nestjs/config": "2.3.1",
"@nestjs/core": "^9.3.12",
Expand All @@ -81,6 +87,7 @@
"ballerine-nestjs-typebox": "3.0.2-next.11",
"base64-stream": "^1.0.0",
"bcrypt": "5.1.0",
"bullmq": "^5.13.2",
"class-transformer": "0.5.1",
"class-validator": "0.14.0",
"concat-stream": "^2.0.0",
Expand Down
2 changes: 1 addition & 1 deletion services/workflows-service/prisma/data-migrations
4 changes: 4 additions & 0 deletions services/workflows-service/src/alert/alert.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import { ProjectModule } from '@/project/project.module';
import { UserRepository } from '@/user/user.repository';
import { AlertDefinitionModule } from '@/alert-definition/alert-definition.module';
import { SentryModule } from '@/sentry/sentry.module';
import { BullMqModule } from '@/bull-mq/bull-mq.module';
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module';

@Module({
imports: [
Expand All @@ -32,6 +34,8 @@ import { SentryModule } from '@/sentry/sentry.module';
PrismaModule,
SentryModule,
ProjectModule,
BullMqModule,
OutgoingWebhooksModule,
HttpModule.register({
timeout: 5000,
maxRedirects: 10,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import { alertWebhookFailure } from '@/events/alert-webhook-failure';
import { lastValueFrom } from 'rxjs';
import * as common from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ClsService } from 'nestjs-cls';
import { SentryInterceptor } from '@/sentry/sentry.interceptor';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { WebhookEventEmitterService } from './webhook-event-emitter.service';
import { IWebhookEntityEventData } from './types';
import { Webhook } from '@/events/get-webhooks';
import { HttpService } from '@nestjs/axios';
import { sign } from '@ballerine/common';
import { AnyRecord } from '@ballerine/common';
import { env } from '@/env';
import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service';
import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service';

@common.Injectable()
export abstract class WebhookHttpService extends HttpService {}
export class WebhookHttpService {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove empty WebhookHttpService class.

The WebhookHttpService class appears to be empty and serves no purpose. Consider removing it since the webhook functionality has been moved to dedicated services.

-@common.Injectable()
-export class WebhookHttpService {}


@common.Injectable()
@common.UseInterceptors(SentryInterceptor)
export class WebhookManagerService {
constructor(
private readonly cls: ClsService,
protected readonly logger: AppLoggerService,
protected readonly configService: ConfigService,
protected readonly httpService: WebhookHttpService,
protected readonly outgoingQueueWebhookService: OutgoingWebhookQueueService,
protected readonly outgoingWebhookService: OutgoingWebhooksService,
protected readonly webhookEventEmitter: WebhookEventEmitterService,
) {
webhookEventEmitter.on('*', async (eventData: any) => {
Expand All @@ -45,21 +45,34 @@ export class WebhookManagerService {
webhook: Webhook;
webhookSharedSecret: string;
}) {
try {
const { id, url, environment, apiVersion } = webhook;
const { id, url, environment, apiVersion } = webhook;

if (env.QUEUE_SYSTEM_ENABLED) {
return await this.outgoingQueueWebhookService.addJob({
url,
method: 'POST',
headers: {},
body: data as unknown as AnyRecord,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid type assertion with unknown.

The type assertion data as unknown as AnyRecord is a code smell. Consider proper type validation or transformation.

-body: data as unknown as AnyRecord,
+body: validateAndTransformData(data),

+function validateAndTransformData<T>(data: T): AnyRecord {
+  // Add validation logic here
+  return data as AnyRecord;
+}

Also applies to: 68-68

timeout: 15_000,
secret: webhookSharedSecret,
});
}

try {
this.logger.log('Sending webhook', { id, url });

const res = await lastValueFrom(
this.httpService.post(url, data, {
headers: {
'X-HMAC-Signature': sign({
payload: data,
key: webhookSharedSecret,
}),
},
}),
);
const response = await this.outgoingWebhookService.invokeWebhook({
url,
method: 'POST',
headers: {},
body: data as unknown as AnyRecord,
timeout: 15_000,
secret: webhookSharedSecret,
});

if (response.status < 200 || response.status >= 300) {
throw new Error(`Webhook failed with status ${response.status} for ${url}`);
}
} catch (error: Error | any) {
this.logger.error('Webhook error data', {
data,
Expand Down
8 changes: 6 additions & 2 deletions services/workflows-service/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import { WorkflowModule } from '@/workflow/workflow.module';
import { TransactionModule } from '@/transaction/transaction.module';
import { AlertModule } from '@/alert/alert.module';
import { SwaggerController } from './swagger/swagger.controller';
import { WebhooksModule } from '@/webhooks/webhooks.module';
import { BusinessReportModule } from '@/business-report/business-report.module';
import { ScheduleModule } from '@nestjs/schedule';
import { CronModule } from '@/workflow/cron/cron.module';
Expand All @@ -49,6 +48,9 @@ import { RuleEngineModule } from './rule-engine/rule-engine.module';
import { NotionModule } from '@/notion/notion.module';
import { SecretsManagerModule } from '@/secrets-manager/secrets-manager.module';
import { NoteModule } from '@/note/note.module';
import { BullMqModule } from './bull-mq/bull-mq.module';
import { OutgoingWebhooksModule } from './webhooks/outgoing-webhooks/outgoing-webhooks.module';
import { IncomingWebhooksModule } from './webhooks/incoming/incoming-webhooks.module';

export const validate = async (config: Record<string, unknown>) => {
const zodEnvSchema = z
Expand Down Expand Up @@ -91,7 +93,8 @@ export const validate = async (config: Record<string, unknown>) => {
EventEmitterModule.forRoot(),
UserModule,
WorkflowModule,
WebhooksModule,
OutgoingWebhooksModule,
IncomingWebhooksModule,
NoteModule,
UiDefinitionModule,
StorageModule,
Expand Down Expand Up @@ -133,6 +136,7 @@ export const validate = async (config: Record<string, unknown>) => {
RuleEngineModule,
NotionModule,
SecretsManagerModule,
BullMqModule,
],
providers: [
{
Expand Down
56 changes: 56 additions & 0 deletions services/workflows-service/src/bull-mq/bull-mq.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { BullModule, RegisterQueueOptions } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';

import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { BullBoardModule } from '@bull-board/nestjs';

import { QUEUES } from '@/bull-mq/consts';
import { IncomingWebhookQueueService } from '@/bull-mq/queues/incoming-webhook-queue.service';
import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service';
import { AppLoggerModule } from '@/common/app-logger/app-logger.module';
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module';

@Module({
imports: [
AppLoggerModule,
ConfigModule,
OutgoingWebhooksModule,
// Register bull & init redis connection
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
connection: {
host: configService.get('REDIS_HOST'),
port: configService.get('REDIS_PORT'),
password: configService.get('REDIS_PASSWORD'),
},
}),
inject: [ConfigService],
}),
// Register bull board module at /api/queues
BullBoardModule.forRoot({
route: '/queues',
adapter: ExpressAdapter,
}),
Comment on lines +33 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Secure access to Bull Board.
Exposing the Bull Board at /queues can create security or privacy concerns in production. Consider restricting access via authentication, IP whitelisting, or at least an environment-based guard to prevent unauthorized queue monitoring.

// Register queues and pass config to bull board forFeature
...Object.values(QUEUES).flatMap(queue => {
const queues: Array<Omit<RegisterQueueOptions, 'name'> & { name: string }> = [
{ name: queue.name, ...queue.config },
];

if ('dlq' in queue) {
queues.push({ name: queue.dlq });
}

return queues.flatMap(queue => [
BullModule.registerQueue(queue),
BullBoardModule.forFeature({ name: queue.name, adapter: BullMQAdapter }),
]);
}),
],
providers: [OutgoingWebhookQueueService, IncomingWebhookQueueService],
exports: [OutgoingWebhookQueueService, IncomingWebhookQueueService],
})
export class BullMqModule {}
37 changes: 37 additions & 0 deletions services/workflows-service/src/bull-mq/consts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { BaseJobOptions } from 'bullmq/dist/esm/interfaces';

export const QUEUES = {
DEFAULT: {
name: 'default',
dlq: 'default-dlq',
config: {
attempts: 15,
backoff: {
type: 'exponential',
delay: 1000,
},
},
},
INCOMING_WEBHOOKS_QUEUE: {
name: 'incoming-webhook-queue',
dlq: 'incoming-webhook-queue-dlq',
config: {
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000,
},
},
},
OUTGOING_WEBHOOKS_QUEUE: {
name: 'outgoing-webhook-queue',
dlq: 'outgoing-webhook-queue-dlq',
config: {
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000,
},
},
},
} satisfies Record<string, { name: string; dlq?: string; config: BaseJobOptions }>;
Loading
Loading