Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/adoption/adoption.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
Logger,
NotFoundException,
ConflictException,
Optional,
} from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { EventsService } from '../events/events.service';
Expand All @@ -14,6 +15,7 @@ import {
} from '@prisma/client';
import { CreateAdoptionDto } from './dto/create-adoption.dto';
import { UpdateAdoptionStatusDto } from './dto/update-adoption-status.dto';
import { NotificationQueueService } from '../jobs/services/notification-queue.service';

/** Maps an AdoptionStatus to its corresponding EventType, if one exists. */
const ADOPTION_STATUS_EVENT_MAP: Partial<Record<AdoptionStatus, EventType>> = {
Expand All @@ -28,6 +30,8 @@ export class AdoptionService {
constructor(
private readonly prisma: PrismaService,
private readonly events: EventsService,
@Optional()
private readonly notificationQueueService?: NotificationQueueService,
) {}

/**
Expand Down Expand Up @@ -137,6 +141,35 @@ export class AdoptionService {
adopterId: updated.adopterId,
} satisfies Prisma.InputJsonValue,
});

// Best-effort: enqueue a notification email without blocking status updates.
if (this.notificationQueueService) {
try {
const adopter = await this.prisma.user.findUnique({
where: { id: updated.adopterId },
select: { email: true },
});

if (adopter?.email) {
await this.notificationQueueService.enqueueSendTransactionalEmail(
{
dto: {
to: adopter.email,
subject: `PetAd: Adoption ${dto.status}`,
text: `Hello! Your adoption has been updated to ${dto.status}.`,
},
metadata: { adoptionId, newStatus: dto.status },
},
);
}
} catch (error) {
const reason =
error instanceof Error ? error.message : String(error);
this.logger.error(
`Failed to enqueue adoption notification email | adoptionId=${adoptionId} | reason=${reason}`,
);
}
}
}

return updated;
Expand Down
2 changes: 2 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { LoggingModule } from './logging/logging.module';
import { HttpExceptionFilter } from './common/filters/http-exception.filter';
import { APP_INTERCEPTOR } from '@nestjs/core';
import { LoggingInterceptor } from './logging/logging.interceptor';
import { JobsModule } from './jobs/jobs.module';

@Module({
imports: [
Expand All @@ -29,6 +30,7 @@ import { LoggingInterceptor } from './logging/logging.interceptor';
AuthModule,
HealthModule,
LoggingModule,
JobsModule,

],

Expand Down
33 changes: 33 additions & 0 deletions src/custody/custody.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ import {
Injectable,
NotFoundException,
BadRequestException,
Optional,
} from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { EventsService } from '../events/events.service';
import { EscrowService } from '../escrow/escrow.service';
import { CreateCustodyDto } from './dto/create-custody.dto';
import { CustodyResponseDto } from './dto/custody-response.dto';
import { CustodyStatus } from '@prisma/client';
import { NotificationQueueService } from '../jobs/services/notification-queue.service';

@Injectable()
export class CustodyService {
constructor(
private readonly prisma: PrismaService,
private readonly eventsService: EventsService,
private readonly escrowService: EscrowService,
@Optional()
private readonly notificationQueueService?: NotificationQueueService,
) {}

async createCustody(
Expand Down Expand Up @@ -145,6 +149,35 @@ export class CustodyService {
},
});

// Best-effort: enqueue a notification email without blocking custody creation.
if (this.notificationQueueService) {
try {
const holder = await this.prisma.user.findUnique({
where: { id: custody.holderId },
select: { email: true },
});

if (holder?.email) {
await this.notificationQueueService.enqueueSendTransactionalEmail({
dto: {
to: holder.email,
subject: 'PetAd: Custody Agreement Started',
text: `Hello! Your custody agreement has started for pet ${custody.petId}.`,
},
metadata: { custodyId: custody.id, petId: custody.petId },
});
}
} catch (error) {
const reason =
error instanceof Error ? error.message : String(error);
// Intentionally using Nest logger semantics; don't fail request due to async email.
// eslint-disable-next-line no-console
console.error(
`Failed to enqueue custody notification email | custodyId=${custody.id} | reason=${reason}`,
);
}
}

return custody as CustodyResponseDto;
}
}
24 changes: 24 additions & 0 deletions src/jobs/jobs.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { Global, Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { EmailModule } from '../email/email.module';
import { LoggingModule } from '../logging/logging.module';
import { NotificationProcessor } from './processors/notification.processor';
import { NotificationWorker } from './workers/notification.worker';
import { NotificationQueueService } from './services/notification-queue.service';

@Global()
@Module({
imports: [
ConfigModule, // Provides ConfigService to our queue/worker services
EmailModule,
LoggingModule,
],
providers: [
NotificationProcessor,
NotificationQueueService,
NotificationWorker,
],
exports: [NotificationQueueService],
})
export class JobsModule {}

82 changes: 82 additions & 0 deletions src/jobs/processors/notification.processor.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { Test } from '@nestjs/testing';
import { NotificationProcessor } from './notification.processor';
import { EmailService } from '../../email/email.service';
import { LoggingService } from '../../logging/logging.service';

describe('NotificationProcessor', () => {
const mockEmailService = {
sendTransactionalEmail: jest.fn(),
};

const mockLoggingService = {
log: jest.fn(),
};

let processor: NotificationProcessor;

beforeEach(async () => {
jest.clearAllMocks();

const moduleRef = await Test.createTestingModule({
providers: [
NotificationProcessor,
{ provide: EmailService, useValue: mockEmailService },
{ provide: LoggingService, useValue: mockLoggingService },
],
}).compile();

processor = moduleRef.get(NotificationProcessor);
});

it('sends transactional email on success', async () => {
mockEmailService.sendTransactionalEmail.mockResolvedValue({
messageId: 'msg-1',
});

await processor.process({
id: 'job-1',
name: 'send_transactional_email',
queueName: 'notifications',
data: {
dto: {
to: '[email protected]',
subject: 'Subject',
text: 'Text',
},
metadata: { adoptionId: 'adopt-1' },
},
} as any);

expect(mockEmailService.sendTransactionalEmail).toHaveBeenCalledWith({
to: '[email protected]',
subject: 'Subject',
text: 'Text',
});
});

it('throws descriptive error and logs on failure', async () => {
mockEmailService.sendTransactionalEmail.mockRejectedValue(
new Error('SMTP down'),
);
mockLoggingService.log.mockResolvedValue(null);

await expect(
processor.process({
id: 'job-2',
name: 'send_transactional_email',
queueName: 'notifications',
data: {
dto: {
to: '[email protected]',
subject: 'Subject',
text: 'Text',
},
metadata: { adoptionId: 'adopt-2' },
},
} as any),
).rejects.toThrow(/Notification job failed/);

expect(mockLoggingService.log).toHaveBeenCalled();
});
});

71 changes: 71 additions & 0 deletions src/jobs/processors/notification.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { Injectable, Logger } from '@nestjs/common';
import type { Job } from 'bullmq';
import { EmailService } from '../../email/email.service';
import { LoggingService } from '../../logging/logging.service';
import type { SendTransactionalEmailDto } from '../../email/dto/send-transactional-email.dto';

export type SendTransactionalEmailJobPayload = {
dto: SendTransactionalEmailDto;
metadata?: Record<string, any>;
};

@Injectable()
export class NotificationProcessor {
private readonly logger = new Logger(NotificationProcessor.name);

constructor(
private readonly emailService: EmailService,
private readonly loggingService: LoggingService,
) {}

async process(
job: Job<SendTransactionalEmailJobPayload>,
): Promise<void> {
const payload = job.data;
const to = payload?.dto?.to;
const subject = payload?.dto?.subject;

try {
await this.emailService.sendTransactionalEmail(payload.dto);
return;
} catch (error) {
const reason =
error instanceof Error ? error.message : String(error ?? 'Unknown');

const descriptiveError = [
'Notification job failed',
`(queueJobId=${String(job.id)})`,
`(jobName=${String(job.name)})`,
`(to=${String(to)})`,
`(subject=${String(subject)})`,
payload?.metadata ? `(metadata=${JSON.stringify(payload.metadata)})` : '',
`reason=${reason}`,
]
.filter(Boolean)
.join(' ');

// Best-effort logging; the job failure should still surface via throwing.
try {
await this.loggingService.log({
level: 'ERROR',
action: 'NOTIFICATION_JOB_FAILED',
message: descriptiveError,
metadata: {
queue: job.queueName,
jobId: job.id,
jobName: job.name,
to,
subject,
...payload.metadata,
},
});
} catch {
// Intentionally ignore logging failures.
}

this.logger.error(descriptiveError);
throw new Error(descriptiveError);
}
}
}

34 changes: 34 additions & 0 deletions src/jobs/queues/queue.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import type { ConfigService } from '@nestjs/config';

export const NOTIFICATION_QUEUE_NAME = 'notifications';
export const SEND_TRANSACTIONAL_EMAIL_JOB_NAME = 'send_transactional_email';

export function getRedisConnection(configService: ConfigService) {
const redisUrl = configService.get<string>('REDIS_URL');
if (!redisUrl) {
throw new Error(
'Missing required env var `REDIS_URL` for BullMQ queues.',
);
}

return { url: redisUrl };
}

export function getQueueConcurrency(configService: ConfigService): number {
const raw = configService.get<string>('QUEUE_CONCURRENCY');
const parsed = raw ? Number.parseInt(raw, 10) : 5;
return Number.isFinite(parsed) && parsed > 0 ? parsed : 5;
}

export function getJobAttempts(configService: ConfigService): number {
const raw = configService.get<string>('JOB_ATTEMPTS');
const parsed = raw ? Number.parseInt(raw, 10) : 3;
return Number.isFinite(parsed) && parsed > 0 ? parsed : 3;
}

export function getJobBackoffDelay(configService: ConfigService): number {
const raw = configService.get<string>('JOB_BACKOFF_DELAY');
const parsed = raw ? Number.parseInt(raw, 10) : 5000;
return Number.isFinite(parsed) && parsed >= 0 ? parsed : 5000;
}

Loading
Loading