diff --git a/app/backend/src/jobs/jobs-monitor.service.ts b/app/backend/src/jobs/jobs-monitor.service.ts new file mode 100644 index 0000000..78b9bf8 --- /dev/null +++ b/app/backend/src/jobs/jobs-monitor.service.ts @@ -0,0 +1,54 @@ +import { Injectable } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; +import { ConfigService } from '@nestjs/config'; + +@Injectable() +export class JobsMonitorService { + private readonly deadLetterQueue: Queue; + + constructor( + @InjectQueue('verification') private readonly verificationQueue: Queue, + @InjectQueue('notifications') private readonly notificationsQueue: Queue, + @InjectQueue('onchain') private readonly onchainQueue: Queue, + private readonly configService: ConfigService, + ) { + this.deadLetterQueue = new Queue('onchain-dead-letter', { + connection: { + host: this.configService.get('REDIS_HOST') || 'localhost', + port: parseInt(this.configService.get('REDIS_PORT') || '6379', 10), + }, + }); + } + + async getStatus() { + return { + verification: await this.getQueueStatus(this.verificationQueue), + notifications: await this.getQueueStatus(this.notificationsQueue), + onchain: await this.getQueueStatus(this.onchainQueue), + onchainDeadLetter: await this.getQueueStatus(this.deadLetterQueue), + }; + } + + private async getQueueStatus(queue: Queue) { + const [isReady, waiting, active, completed, failed, delayed] = + await Promise.all([ + queue.isReady(), + queue.getWaitingCount(), + queue.getActiveCount(), + queue.getCompletedCount(), + queue.getFailedCount(), + queue.getDelayedCount(), + ]); + + return { + name: queue.name, + isReady, + waiting, + active, + completed, + failed, + delayed, + }; + } +} diff --git a/app/backend/src/jobs/jobs.controller.ts b/app/backend/src/jobs/jobs.controller.ts index 4dbd123..ee3cc74 100644 --- a/app/backend/src/jobs/jobs.controller.ts +++ b/app/backend/src/jobs/jobs.controller.ts @@ -1,43 +1,15 @@ import { Controller, Get } from '@nestjs/common'; -import { InjectQueue } from '@nestjs/bullmq'; -import { Queue } from 'bullmq'; import { ApiTags, ApiOperation } from '@nestjs/swagger'; +import { JobsMonitorService } from './jobs-monitor.service'; @ApiTags('Jobs') @Controller('jobs') export class JobsController { - constructor( - @InjectQueue('verification') private verificationQueue: Queue, - @InjectQueue('notifications') private notificationsQueue: Queue, - @InjectQueue('onchain') private onchainQueue: Queue, - ) {} + constructor(private readonly jobsMonitorService: JobsMonitorService) {} @ApiOperation({ summary: 'Get status of all background job queues' }) - @Get('status') + @Get(['status', 'health']) async getStatus() { - return { - verification: await this.getQueueStatus(this.verificationQueue), - notifications: await this.getQueueStatus(this.notificationsQueue), - onchain: await this.getQueueStatus(this.onchainQueue), - }; - } - - private async getQueueStatus(queue: Queue) { - const [waiting, active, completed, failed, delayed] = await Promise.all([ - queue.getWaitingCount(), - queue.getActiveCount(), - queue.getCompletedCount(), - queue.getFailedCount(), - queue.getDelayedCount(), - ]); - - return { - name: queue.name, - waiting, - active, - completed, - failed, - delayed, - }; + return this.jobsMonitorService.getStatus(); } } diff --git a/app/backend/src/jobs/jobs.module.ts b/app/backend/src/jobs/jobs.module.ts index 2994df9..e7dab4e 100644 --- a/app/backend/src/jobs/jobs.module.ts +++ b/app/backend/src/jobs/jobs.module.ts @@ -1,6 +1,7 @@ import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bullmq'; import { JobsController } from './jobs.controller'; +import { JobsMonitorService } from './jobs-monitor.service'; @Module({ imports: [ @@ -9,5 +10,7 @@ import { JobsController } from './jobs.controller'; BullModule.registerQueue({ name: 'onchain' }), ], controllers: [JobsController], + providers: [JobsMonitorService], + exports: [JobsMonitorService], }) export class JobsModule {} diff --git a/app/backend/src/onchain/onchain.module.ts b/app/backend/src/onchain/onchain.module.ts index 9b60453..ccb13ef 100644 --- a/app/backend/src/onchain/onchain.module.ts +++ b/app/backend/src/onchain/onchain.module.ts @@ -48,6 +48,36 @@ const onchainAdapterProvider: Provider = { host: configService.get('REDIS_HOST') || 'localhost', port: parseInt(configService.get('REDIS_PORT') || '6379'), }, + defaultJobOptions: { + attempts: 5, + backoff: { + type: 'exponential', + delay: 10000, + }, + removeOnComplete: { + count: 100, + age: 7 * 24 * 60 * 60, + }, + removeOnFail: { + count: 50, + age: 7 * 24 * 60 * 60, + }, + }, + }), + inject: [ConfigService], + }), + BullModule.registerQueueAsync({ + name: 'onchain-dead-letter', + imports: [ConfigModule], + useFactory: (configService: ConfigService) => ({ + connection: { + host: configService.get('REDIS_HOST') || 'localhost', + port: parseInt(configService.get('REDIS_PORT') || '6379'), + }, + defaultJobOptions: { + removeOnComplete: false, + removeOnFail: false, + }, }), inject: [ConfigService], }), diff --git a/app/backend/src/onchain/onchain.processor.ts b/app/backend/src/onchain/onchain.processor.ts index 5f29b64..1be667e 100644 --- a/app/backend/src/onchain/onchain.processor.ts +++ b/app/backend/src/onchain/onchain.processor.ts @@ -1,7 +1,8 @@ /* eslint-disable @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-argument */ import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; import { Logger, Inject } from '@nestjs/common'; -import { Job } from 'bullmq'; +import { Job, Queue } from 'bullmq'; +import { ConfigService } from '@nestjs/config'; import { OnchainJobData, OnchainJobResult, @@ -14,12 +15,20 @@ import { ONCHAIN_ADAPTER_TOKEN, OnchainAdapter } from './onchain.adapter'; }) export class OnchainProcessor extends WorkerHost { private readonly logger = new Logger(OnchainProcessor.name); + private readonly deadLetterQueue: Queue; constructor( @Inject(ONCHAIN_ADAPTER_TOKEN) private readonly onchainAdapter: OnchainAdapter, + private readonly configService: ConfigService, ) { super(); + this.deadLetterQueue = new Queue('onchain-dead-letter', { + connection: { + host: this.configService.get('REDIS_HOST') || 'localhost', + port: parseInt(this.configService.get('REDIS_PORT') || '6379', 10), + }, + }); } async process( @@ -57,10 +66,21 @@ export class OnchainProcessor extends WorkerHost { metadata: result?.metadata, }; } catch (error) { - this.logger.error( - `Onchain job ${job.id} failed: ${error instanceof Error ? error.message : 'Unknown error'}`, - error instanceof Error ? error.stack : undefined, - ); + const message = + error instanceof Error ? error.message : 'Unknown onchain error'; + const stack = error instanceof Error ? error.stack : undefined; + + if (this.isTransientOnchainError(error)) { + this.logger.warn( + `Transient onchain error for job ${job.id}: ${message}`, + ); + } else { + this.logger.error( + `Onchain job ${job.id} failed: ${message}`, + stack, + ); + } + throw error; } } @@ -71,11 +91,65 @@ export class OnchainProcessor extends WorkerHost { } @OnWorkerEvent('failed') - onFailed(job: Job | undefined, error: Error) { + async onFailed(job: Job | undefined, error: Error) { if (job) { this.logger.error(`Onchain job ${job.id} failed: ${error.message}`); + if (this.isFinalFailure(job)) { + await this.moveToDeadLetterQueue(job, error); + } } else { this.logger.error(`Onchain job failed: ${error.message}`); } } + + private isTransientOnchainError(error: unknown): boolean { + if (!(error instanceof Error)) { + return false; + } + + const normalized = error.message.toLowerCase(); + return [ + 'timeout', + 'timed out', + 'network', + 'connection reset', + 'econnrefused', + 'econnreset', + 'ledger congestion', + 'rate limit', + 'rate-limited', + 'service unavailable', + 'unavailable', + '503', + '504', + ].some(token => normalized.includes(token)); + } + + private isFinalFailure(job: Job): boolean { + const totalAttempts = job.opts.attempts ?? 1; + return job.attemptsMade >= totalAttempts; + } + + private async moveToDeadLetterQueue(job: Job, error: Error) { + await this.deadLetterQueue.add( + `dead-letter-${job.id}`, + { + originalJobId: job.id, + originalName: job.name, + data: job.data, + failedAt: new Date().toISOString(), + failedReason: error.message, + attemptsMade: job.attemptsMade, + stack: error.stack, + }, + { + removeOnComplete: false, + removeOnFail: false, + }, + ); + + this.logger.warn( + `Moved job ${job.id} to onchain dead letter queue after ${job.attemptsMade} attempts`, + ); + } } diff --git a/app/backend/src/onchain/onchain.service.ts b/app/backend/src/onchain/onchain.service.ts index 1f2d0c3..ce0d289 100644 --- a/app/backend/src/onchain/onchain.service.ts +++ b/app/backend/src/onchain/onchain.service.ts @@ -37,7 +37,14 @@ export class OnchainService { type: 'exponential', delay: 10000, }, - removeOnComplete: true, + removeOnComplete: { + count: 100, + age: 7 * 24 * 60 * 60, + }, + removeOnFail: { + count: 50, + age: 7 * 24 * 60 * 60, + }, }); this.logger.log(`Enqueued onchain job: ${job.id} for ${type}`);