diff --git a/src/adoption/adoption.service.ts b/src/adoption/adoption.service.ts index 274c940..24a4629 100644 --- a/src/adoption/adoption.service.ts +++ b/src/adoption/adoption.service.ts @@ -3,6 +3,7 @@ import { Logger, NotFoundException, ConflictException, + Optional, } from '@nestjs/common'; import { PrismaService } from '../prisma/prisma.service'; import { EventsService } from '../events/events.service'; @@ -14,6 +15,7 @@ import { } from '@prisma/client'; import { CreateAdoptionDto } from './dto/create-adoption.dto'; import { UpdateAdoptionStatusDto } from './dto/update-adoption-status.dto'; +import { NotificationQueueService } from '../jobs/services/notification-queue.service'; /** Maps an AdoptionStatus to its corresponding EventType, if one exists. */ const ADOPTION_STATUS_EVENT_MAP: Partial> = { @@ -28,6 +30,8 @@ export class AdoptionService { constructor( private readonly prisma: PrismaService, private readonly events: EventsService, + @Optional() + private readonly notificationQueueService?: NotificationQueueService, ) {} /** @@ -137,6 +141,35 @@ export class AdoptionService { adopterId: updated.adopterId, } satisfies Prisma.InputJsonValue, }); + + // Best-effort: enqueue a notification email without blocking status updates. + if (this.notificationQueueService) { + try { + const adopter = await this.prisma.user.findUnique({ + where: { id: updated.adopterId }, + select: { email: true }, + }); + + if (adopter?.email) { + await this.notificationQueueService.enqueueSendTransactionalEmail( + { + dto: { + to: adopter.email, + subject: `PetAd: Adoption ${dto.status}`, + text: `Hello! Your adoption has been updated to ${dto.status}.`, + }, + metadata: { adoptionId, newStatus: dto.status }, + }, + ); + } + } catch (error) { + const reason = + error instanceof Error ? error.message : String(error); + this.logger.error( + `Failed to enqueue adoption notification email | adoptionId=${adoptionId} | reason=${reason}`, + ); + } + } } return updated; diff --git a/src/app.module.ts b/src/app.module.ts index df5ab83..874684a 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -15,6 +15,7 @@ import { LoggingModule } from './logging/logging.module'; import { HttpExceptionFilter } from './common/filters/http-exception.filter'; import { APP_INTERCEPTOR } from '@nestjs/core'; import { LoggingInterceptor } from './logging/logging.interceptor'; +import { JobsModule } from './jobs/jobs.module'; @Module({ imports: [ @@ -29,6 +30,7 @@ import { LoggingInterceptor } from './logging/logging.interceptor'; AuthModule, HealthModule, LoggingModule, + JobsModule, ], diff --git a/src/custody/custody.service.ts b/src/custody/custody.service.ts index 7775f71..a798d26 100644 --- a/src/custody/custody.service.ts +++ b/src/custody/custody.service.ts @@ -2,6 +2,7 @@ import { Injectable, NotFoundException, BadRequestException, + Optional, } from '@nestjs/common'; import { PrismaService } from '../prisma/prisma.service'; import { EventsService } from '../events/events.service'; @@ -9,6 +10,7 @@ import { EscrowService } from '../escrow/escrow.service'; import { CreateCustodyDto } from './dto/create-custody.dto'; import { CustodyResponseDto } from './dto/custody-response.dto'; import { CustodyStatus } from '@prisma/client'; +import { NotificationQueueService } from '../jobs/services/notification-queue.service'; @Injectable() export class CustodyService { @@ -16,6 +18,8 @@ export class CustodyService { private readonly prisma: PrismaService, private readonly eventsService: EventsService, private readonly escrowService: EscrowService, + @Optional() + private readonly notificationQueueService?: NotificationQueueService, ) {} async createCustody( @@ -145,6 +149,35 @@ export class CustodyService { }, }); + // Best-effort: enqueue a notification email without blocking custody creation. + if (this.notificationQueueService) { + try { + const holder = await this.prisma.user.findUnique({ + where: { id: custody.holderId }, + select: { email: true }, + }); + + if (holder?.email) { + await this.notificationQueueService.enqueueSendTransactionalEmail({ + dto: { + to: holder.email, + subject: 'PetAd: Custody Agreement Started', + text: `Hello! Your custody agreement has started for pet ${custody.petId}.`, + }, + metadata: { custodyId: custody.id, petId: custody.petId }, + }); + } + } catch (error) { + const reason = + error instanceof Error ? error.message : String(error); + // Intentionally using Nest logger semantics; don't fail request due to async email. + // eslint-disable-next-line no-console + console.error( + `Failed to enqueue custody notification email | custodyId=${custody.id} | reason=${reason}`, + ); + } + } + return custody as CustodyResponseDto; } } diff --git a/src/jobs/jobs.module.ts b/src/jobs/jobs.module.ts new file mode 100644 index 0000000..6b9dc72 --- /dev/null +++ b/src/jobs/jobs.module.ts @@ -0,0 +1,24 @@ +import { Global, Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { EmailModule } from '../email/email.module'; +import { LoggingModule } from '../logging/logging.module'; +import { NotificationProcessor } from './processors/notification.processor'; +import { NotificationWorker } from './workers/notification.worker'; +import { NotificationQueueService } from './services/notification-queue.service'; + +@Global() +@Module({ + imports: [ + ConfigModule, // Provides ConfigService to our queue/worker services + EmailModule, + LoggingModule, + ], + providers: [ + NotificationProcessor, + NotificationQueueService, + NotificationWorker, + ], + exports: [NotificationQueueService], +}) +export class JobsModule {} + diff --git a/src/jobs/processors/notification.processor.spec.ts b/src/jobs/processors/notification.processor.spec.ts new file mode 100644 index 0000000..a5b617c --- /dev/null +++ b/src/jobs/processors/notification.processor.spec.ts @@ -0,0 +1,82 @@ +import { Test } from '@nestjs/testing'; +import { NotificationProcessor } from './notification.processor'; +import { EmailService } from '../../email/email.service'; +import { LoggingService } from '../../logging/logging.service'; + +describe('NotificationProcessor', () => { + const mockEmailService = { + sendTransactionalEmail: jest.fn(), + }; + + const mockLoggingService = { + log: jest.fn(), + }; + + let processor: NotificationProcessor; + + beforeEach(async () => { + jest.clearAllMocks(); + + const moduleRef = await Test.createTestingModule({ + providers: [ + NotificationProcessor, + { provide: EmailService, useValue: mockEmailService }, + { provide: LoggingService, useValue: mockLoggingService }, + ], + }).compile(); + + processor = moduleRef.get(NotificationProcessor); + }); + + it('sends transactional email on success', async () => { + mockEmailService.sendTransactionalEmail.mockResolvedValue({ + messageId: 'msg-1', + }); + + await processor.process({ + id: 'job-1', + name: 'send_transactional_email', + queueName: 'notifications', + data: { + dto: { + to: 'to@example.com', + subject: 'Subject', + text: 'Text', + }, + metadata: { adoptionId: 'adopt-1' }, + }, + } as any); + + expect(mockEmailService.sendTransactionalEmail).toHaveBeenCalledWith({ + to: 'to@example.com', + subject: 'Subject', + text: 'Text', + }); + }); + + it('throws descriptive error and logs on failure', async () => { + mockEmailService.sendTransactionalEmail.mockRejectedValue( + new Error('SMTP down'), + ); + mockLoggingService.log.mockResolvedValue(null); + + await expect( + processor.process({ + id: 'job-2', + name: 'send_transactional_email', + queueName: 'notifications', + data: { + dto: { + to: 'to@example.com', + subject: 'Subject', + text: 'Text', + }, + metadata: { adoptionId: 'adopt-2' }, + }, + } as any), + ).rejects.toThrow(/Notification job failed/); + + expect(mockLoggingService.log).toHaveBeenCalled(); + }); +}); + diff --git a/src/jobs/processors/notification.processor.ts b/src/jobs/processors/notification.processor.ts new file mode 100644 index 0000000..315ef30 --- /dev/null +++ b/src/jobs/processors/notification.processor.ts @@ -0,0 +1,71 @@ +import { Injectable, Logger } from '@nestjs/common'; +import type { Job } from 'bullmq'; +import { EmailService } from '../../email/email.service'; +import { LoggingService } from '../../logging/logging.service'; +import type { SendTransactionalEmailDto } from '../../email/dto/send-transactional-email.dto'; + +export type SendTransactionalEmailJobPayload = { + dto: SendTransactionalEmailDto; + metadata?: Record; +}; + +@Injectable() +export class NotificationProcessor { + private readonly logger = new Logger(NotificationProcessor.name); + + constructor( + private readonly emailService: EmailService, + private readonly loggingService: LoggingService, + ) {} + + async process( + job: Job, + ): Promise { + const payload = job.data; + const to = payload?.dto?.to; + const subject = payload?.dto?.subject; + + try { + await this.emailService.sendTransactionalEmail(payload.dto); + return; + } catch (error) { + const reason = + error instanceof Error ? error.message : String(error ?? 'Unknown'); + + const descriptiveError = [ + 'Notification job failed', + `(queueJobId=${String(job.id)})`, + `(jobName=${String(job.name)})`, + `(to=${String(to)})`, + `(subject=${String(subject)})`, + payload?.metadata ? `(metadata=${JSON.stringify(payload.metadata)})` : '', + `reason=${reason}`, + ] + .filter(Boolean) + .join(' '); + + // Best-effort logging; the job failure should still surface via throwing. + try { + await this.loggingService.log({ + level: 'ERROR', + action: 'NOTIFICATION_JOB_FAILED', + message: descriptiveError, + metadata: { + queue: job.queueName, + jobId: job.id, + jobName: job.name, + to, + subject, + ...payload.metadata, + }, + }); + } catch { + // Intentionally ignore logging failures. + } + + this.logger.error(descriptiveError); + throw new Error(descriptiveError); + } + } +} + diff --git a/src/jobs/queues/queue.config.ts b/src/jobs/queues/queue.config.ts new file mode 100644 index 0000000..4f1c3a8 --- /dev/null +++ b/src/jobs/queues/queue.config.ts @@ -0,0 +1,34 @@ +import type { ConfigService } from '@nestjs/config'; + +export const NOTIFICATION_QUEUE_NAME = 'notifications'; +export const SEND_TRANSACTIONAL_EMAIL_JOB_NAME = 'send_transactional_email'; + +export function getRedisConnection(configService: ConfigService) { + const redisUrl = configService.get('REDIS_URL'); + if (!redisUrl) { + throw new Error( + 'Missing required env var `REDIS_URL` for BullMQ queues.', + ); + } + + return { url: redisUrl }; +} + +export function getQueueConcurrency(configService: ConfigService): number { + const raw = configService.get('QUEUE_CONCURRENCY'); + const parsed = raw ? Number.parseInt(raw, 10) : 5; + return Number.isFinite(parsed) && parsed > 0 ? parsed : 5; +} + +export function getJobAttempts(configService: ConfigService): number { + const raw = configService.get('JOB_ATTEMPTS'); + const parsed = raw ? Number.parseInt(raw, 10) : 3; + return Number.isFinite(parsed) && parsed > 0 ? parsed : 3; +} + +export function getJobBackoffDelay(configService: ConfigService): number { + const raw = configService.get('JOB_BACKOFF_DELAY'); + const parsed = raw ? Number.parseInt(raw, 10) : 5000; + return Number.isFinite(parsed) && parsed >= 0 ? parsed : 5000; +} + diff --git a/src/jobs/services/notification-queue.service.ts b/src/jobs/services/notification-queue.service.ts new file mode 100644 index 0000000..16910fb --- /dev/null +++ b/src/jobs/services/notification-queue.service.ts @@ -0,0 +1,52 @@ +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { Queue } from 'bullmq'; +import { ConfigService } from '@nestjs/config'; +import type { Job } from 'bullmq'; +import { + NOTIFICATION_QUEUE_NAME, + SEND_TRANSACTIONAL_EMAIL_JOB_NAME, + getJobAttempts, + getJobBackoffDelay, + getRedisConnection, +} from '../queues/queue.config'; +import type { SendTransactionalEmailDto } from '../../email/dto/send-transactional-email.dto'; + +export type SendTransactionalEmailJobInput = { + dto: SendTransactionalEmailDto; + metadata?: Record; +}; + +@Injectable() +export class NotificationQueueService implements OnModuleDestroy { + private readonly queue: Queue; + + constructor(private readonly configService: ConfigService) { + const connection = getRedisConnection(configService); + const attempts = getJobAttempts(configService); + const backoffDelay = getJobBackoffDelay(configService); + + // Default job options ensure retry/backoff behavior even if callers + // enqueue jobs without explicit overrides. + this.queue = new Queue(NOTIFICATION_QUEUE_NAME, { + connection, + defaultJobOptions: { + attempts, + backoff: { type: 'fixed', delay: backoffDelay }, + }, + }); + } + + async enqueueSendTransactionalEmail( + input: SendTransactionalEmailJobInput, + ): Promise> { + return this.queue.add( + SEND_TRANSACTIONAL_EMAIL_JOB_NAME, + input, + ); + } + + async onModuleDestroy(): Promise { + await this.queue.close(); + } +} + diff --git a/src/jobs/workers/notification.worker.ts b/src/jobs/workers/notification.worker.ts new file mode 100644 index 0000000..2861371 --- /dev/null +++ b/src/jobs/workers/notification.worker.ts @@ -0,0 +1,51 @@ +import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; +import { Worker } from 'bullmq'; +import type { Job } from 'bullmq'; +import { ConfigService } from '@nestjs/config'; +import { getRedisConnection } from '../queues/queue.config'; +import { + NOTIFICATION_QUEUE_NAME, + SEND_TRANSACTIONAL_EMAIL_JOB_NAME, + getQueueConcurrency, +} from '../queues/queue.config'; +import { NotificationProcessor } from '../processors/notification.processor'; + +@Injectable() +export class NotificationWorker + implements OnModuleInit, OnModuleDestroy +{ + private worker: Worker | null = null; + + constructor( + private readonly configService: ConfigService, + private readonly notificationProcessor: NotificationProcessor, + ) {} + + async onModuleInit() { + const connection = getRedisConnection(this.configService); + const concurrency = getQueueConcurrency(this.configService); + + // The processor is responsible for throwing descriptive errors. + this.worker = new Worker( + NOTIFICATION_QUEUE_NAME, + async (job: Job) => { + if (job.name !== SEND_TRANSACTIONAL_EMAIL_JOB_NAME) { + // Misconfigured payload should surface loudly. + throw new Error( + `Unexpected job name "${job.name}" for queue "${NOTIFICATION_QUEUE_NAME}"`, + ); + } + await this.notificationProcessor.process(job); + }, + { + connection, + concurrency, + }, + ); + } + + async onModuleDestroy() { + await this.worker?.close(); + } +} +