-
Notifications
You must be signed in to change notification settings - Fork 204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/bull mq integration 2 #2983
base: dev
Are you sure you want to change the base?
Changes from all commits
ee38008
3b128e5
8d6a0d2
a868bd4
4edea7d
9000b15
4f31ee5
aa5e4a2
6b8a386
2709ee6
f8dd88c
0c7fbff
435fabd
54406ff
6388019
d9251f3
2229f8a
664f08d
0cf44df
facb5d7
850d21a
f33927a
1a7acc9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
version: '3.8' | ||
services: | ||
redis: | ||
image: redis:alpine | ||
ports: | ||
- '${REDIS_PORT}:6379' | ||
volumes: | ||
- redis-data:/data | ||
command: > | ||
--requirepass ${REDIS_PASSWORD} | ||
--appendonly yes | ||
environment: | ||
- REDIS_PASSWORD=${REDIS_PASSWORD} | ||
- REDIS_PORT=${REDIS_PORT} | ||
networks: | ||
- app-network | ||
volumes: | ||
redis-data: | ||
driver: local | ||
networks: | ||
app-network: | ||
driver: bridge |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,27 @@ | ||
import { alertWebhookFailure } from '@/events/alert-webhook-failure'; | ||
import { lastValueFrom } from 'rxjs'; | ||
import * as common from '@nestjs/common'; | ||
import { ConfigService } from '@nestjs/config'; | ||
import { ClsService } from 'nestjs-cls'; | ||
import { SentryInterceptor } from '@/sentry/sentry.interceptor'; | ||
import { AppLoggerService } from '@/common/app-logger/app-logger.service'; | ||
import { WebhookEventEmitterService } from './webhook-event-emitter.service'; | ||
import { IWebhookEntityEventData } from './types'; | ||
import { Webhook } from '@/events/get-webhooks'; | ||
import { HttpService } from '@nestjs/axios'; | ||
import { sign } from '@ballerine/common'; | ||
import { AnyRecord } from '@ballerine/common'; | ||
import { env } from '@/env'; | ||
import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service'; | ||
import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service'; | ||
|
||
@common.Injectable() | ||
export abstract class WebhookHttpService extends HttpService {} | ||
export class WebhookHttpService {} | ||
|
||
@common.Injectable() | ||
@common.UseInterceptors(SentryInterceptor) | ||
export class WebhookManagerService { | ||
constructor( | ||
private readonly cls: ClsService, | ||
protected readonly logger: AppLoggerService, | ||
protected readonly configService: ConfigService, | ||
protected readonly httpService: WebhookHttpService, | ||
protected readonly outgoingQueueWebhookService: OutgoingWebhookQueueService, | ||
protected readonly outgoingWebhookService: OutgoingWebhooksService, | ||
protected readonly webhookEventEmitter: WebhookEventEmitterService, | ||
) { | ||
webhookEventEmitter.on('*', async (eventData: any) => { | ||
|
@@ -45,21 +45,34 @@ export class WebhookManagerService { | |
webhook: Webhook; | ||
webhookSharedSecret: string; | ||
}) { | ||
try { | ||
const { id, url, environment, apiVersion } = webhook; | ||
const { id, url, environment, apiVersion } = webhook; | ||
|
||
if (env.QUEUE_SYSTEM_ENABLED) { | ||
return await this.outgoingQueueWebhookService.addJob({ | ||
url, | ||
method: 'POST', | ||
headers: {}, | ||
body: data as unknown as AnyRecord, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Avoid type assertion with unknown. The type assertion -body: data as unknown as AnyRecord,
+body: validateAndTransformData(data),
+function validateAndTransformData<T>(data: T): AnyRecord {
+ // Add validation logic here
+ return data as AnyRecord;
+} Also applies to: 68-68 |
||
timeout: 15_000, | ||
secret: webhookSharedSecret, | ||
}); | ||
} | ||
|
||
try { | ||
this.logger.log('Sending webhook', { id, url }); | ||
|
||
const res = await lastValueFrom( | ||
this.httpService.post(url, data, { | ||
headers: { | ||
'X-HMAC-Signature': sign({ | ||
payload: data, | ||
key: webhookSharedSecret, | ||
}), | ||
}, | ||
}), | ||
); | ||
const response = await this.outgoingWebhookService.invokeWebhook({ | ||
url, | ||
method: 'POST', | ||
headers: {}, | ||
body: data as unknown as AnyRecord, | ||
timeout: 15_000, | ||
secret: webhookSharedSecret, | ||
}); | ||
|
||
if (response.status < 200 || response.status >= 300) { | ||
throw new Error(`Webhook failed with status ${response.status} for ${url}`); | ||
} | ||
} catch (error: Error | any) { | ||
this.logger.error('Webhook error data', { | ||
data, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import { BullModule, RegisterQueueOptions } from '@nestjs/bullmq'; | ||
import { Module } from '@nestjs/common'; | ||
import { ConfigModule, ConfigService } from '@nestjs/config'; | ||
|
||
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; | ||
import { ExpressAdapter } from '@bull-board/express'; | ||
import { BullBoardModule } from '@bull-board/nestjs'; | ||
|
||
import { QUEUES } from '@/bull-mq/consts'; | ||
import { IncomingWebhookQueueService } from '@/bull-mq/queues/incoming-webhook-queue.service'; | ||
import { OutgoingWebhookQueueService } from '@/bull-mq/queues/outgoing-webhook-queue.service'; | ||
import { AppLoggerModule } from '@/common/app-logger/app-logger.module'; | ||
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module'; | ||
|
||
@Module({ | ||
imports: [ | ||
AppLoggerModule, | ||
ConfigModule, | ||
OutgoingWebhooksModule, | ||
// Register bull & init redis connection | ||
BullModule.forRootAsync({ | ||
imports: [ConfigModule], | ||
useFactory: async (configService: ConfigService) => ({ | ||
connection: { | ||
host: configService.get('REDIS_HOST'), | ||
port: configService.get('REDIS_PORT'), | ||
password: configService.get('REDIS_PASSWORD'), | ||
}, | ||
}), | ||
inject: [ConfigService], | ||
}), | ||
// Register bull board module at /api/queues | ||
BullBoardModule.forRoot({ | ||
route: '/queues', | ||
adapter: ExpressAdapter, | ||
}), | ||
Comment on lines
+33
to
+36
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Secure access to Bull Board. |
||
// Register queues and pass config to bull board forFeature | ||
...Object.values(QUEUES).flatMap(queue => { | ||
const queues: Array<Omit<RegisterQueueOptions, 'name'> & { name: string }> = [ | ||
{ name: queue.name, ...queue.config }, | ||
]; | ||
|
||
if ('dlq' in queue) { | ||
queues.push({ name: queue.dlq }); | ||
} | ||
|
||
return queues.flatMap(queue => [ | ||
BullModule.registerQueue(queue), | ||
BullBoardModule.forFeature({ name: queue.name, adapter: BullMQAdapter }), | ||
]); | ||
}), | ||
], | ||
providers: [OutgoingWebhookQueueService, IncomingWebhookQueueService], | ||
exports: [OutgoingWebhookQueueService, IncomingWebhookQueueService], | ||
}) | ||
export class BullMqModule {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import { BaseJobOptions } from 'bullmq/dist/esm/interfaces'; | ||
|
||
export const QUEUES = { | ||
DEFAULT: { | ||
name: 'default', | ||
dlq: 'default-dlq', | ||
config: { | ||
attempts: 15, | ||
backoff: { | ||
type: 'exponential', | ||
delay: 1000, | ||
}, | ||
}, | ||
}, | ||
INCOMING_WEBHOOKS_QUEUE: { | ||
name: 'incoming-webhook-queue', | ||
dlq: 'incoming-webhook-queue-dlq', | ||
config: { | ||
attempts: 10, | ||
backoff: { | ||
type: 'exponential', | ||
delay: 1000, | ||
}, | ||
}, | ||
}, | ||
OUTGOING_WEBHOOKS_QUEUE: { | ||
name: 'outgoing-webhook-queue', | ||
dlq: 'outgoing-webhook-queue-dlq', | ||
config: { | ||
attempts: 10, | ||
backoff: { | ||
type: 'exponential', | ||
delay: 1000, | ||
}, | ||
}, | ||
}, | ||
} satisfies Record<string, { name: string; dlq?: string; config: BaseJobOptions }>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove empty WebhookHttpService class.
The
WebhookHttpService
class appears to be empty and serves no purpose. Consider removing it since the webhook functionality has been moved to dedicated services.