diff --git a/apps/api/src/webhook-support/1700000000000-CreateWebhookTables.ts b/apps/api/src/webhook-support/1700000000000-CreateWebhookTables.ts new file mode 100644 index 0000000..0adf839 --- /dev/null +++ b/apps/api/src/webhook-support/1700000000000-CreateWebhookTables.ts @@ -0,0 +1,114 @@ +import { MigrationInterface, QueryRunner, Table, TableIndex } from 'typeorm'; + +export class CreateWebhookTables1700000000000 implements MigrationInterface { + name = 'CreateWebhookTables1700000000000'; + + // ────────────────────────────────────────────────────────────────────────── + // UP + // ────────────────────────────────────────────────────────────────────────── + async up(queryRunner: QueryRunner): Promise { + // ── webhooks ──────────────────────────────────────────────────────────── + await queryRunner.createTable( + new Table({ + name: 'webhooks', + columns: [ + { + name: 'id', + type: 'uuid', + isPrimary: true, + generationStrategy: 'uuid', + default: 'gen_random_uuid()', + }, + { name: 'name', type: 'varchar', length: '255' }, + { name: 'url', type: 'text' }, + { name: 'secret', type: 'text' }, + { + name: 'events', + type: 'text', + // TypeORM simple-array uses comma-separated text + default: "''", + }, + { name: 'isActive', type: 'boolean', default: true }, + { name: 'maxRetries', type: 'int', default: 5 }, + { name: 'description', type: 'text', isNullable: true }, + { + name: 'createdAt', + type: 'timestamptz', + default: 'now()', + }, + { + name: 'updatedAt', + type: 'timestamptz', + default: 'now()', + }, + ], + }), + true, + ); + + // ── webhook_deliveries ────────────────────────────────────────────────── + await queryRunner.createTable( + new Table({ + name: 'webhook_deliveries', + columns: [ + { + name: 'id', + type: 'uuid', + isPrimary: true, + generationStrategy: 'uuid', + default: 'gen_random_uuid()', + }, + { name: 'webhookId', type: 'uuid' }, + { name: 'event', type: 'varchar' }, + { name: 'payload', type: 'jsonb' }, + { name: 'status', type: 'varchar', default: "'pending'" }, + { name: 'responseStatus', type: 'int', isNullable: true }, + { name: 'responseBody', type: 'text', isNullable: true }, + { name: 'errorMessage', type: 'text', isNullable: true }, + { name: 'attempt', type: 'int', default: 0 }, + { name: 'nextRetryAt', type: 'timestamptz', isNullable: true }, + { name: 'deliveredAt', type: 'timestamptz', isNullable: true }, + { + name: 'createdAt', + type: 'timestamptz', + default: 'now()', + }, + ], + foreignKeys: [ + { + columnNames: ['webhookId'], + referencedTableName: 'webhooks', + referencedColumnNames: ['id'], + onDelete: 'CASCADE', + }, + ], + }), + true, + ); + + // ── indexes ───────────────────────────────────────────────────────────── + await queryRunner.createIndex( + 'webhook_deliveries', + new TableIndex({ + name: 'IDX_webhook_deliveries_webhookId_createdAt', + columnNames: ['webhookId', 'createdAt'], + }), + ); + + await queryRunner.createIndex( + 'webhook_deliveries', + new TableIndex({ + name: 'IDX_webhook_deliveries_status', + columnNames: ['status'], + }), + ); + } + + // ────────────────────────────────────────────────────────────────────────── + // DOWN + // ────────────────────────────────────────────────────────────────────────── + async down(queryRunner: QueryRunner): Promise { + await queryRunner.dropTable('webhook_deliveries', true); + await queryRunner.dropTable('webhooks', true); + } +} diff --git a/apps/api/src/webhook-support/dispatch-usage.example.ts b/apps/api/src/webhook-support/dispatch-usage.example.ts new file mode 100644 index 0000000..ee32711 --- /dev/null +++ b/apps/api/src/webhook-support/dispatch-usage.example.ts @@ -0,0 +1,70 @@ +/** + * Usage Example — Dispatching webhook events from other BridgeWise services + * + * Drop this pattern into any service that wants to emit webhook events. + * No circular dependency: WebhookModule exports WebhookService so any + * feature module can import it freely. + */ + +import { Injectable } from '@nestjs/common'; +import { WebhookEvent, WebhookService } from '../index'; // adjust path + +// ── Example: Gas alert service emitting webhook events ──────────────────────── + +@Injectable() +export class GasAlertService { + constructor( + // ... your other deps + private readonly webhooks: WebhookService, + ) {} + + async handleSpikeDetected(chainId: string, gweiValue: number): Promise { + // ... your existing logic + + // Fire & forget — webhook delivery is async via BullMQ + await this.webhooks.dispatch({ + event: WebhookEvent.GAS_SPIKE_DETECTED, + data: { + chainId, + gweiValue, + detectedAt: new Date().toISOString(), + }, + }); + } + + async handleGasNormalized(chainId: string, gweiValue: number): Promise { + // ... your existing logic + + await this.webhooks.dispatch({ + event: WebhookEvent.GAS_NORMALIZED, + data: { chainId, gweiValue }, + }); + } +} + +// ── AppModule wiring ────────────────────────────────────────────────────────── +// +// In your AppModule (or the feature module): +// +// @Module({ +// imports: [ +// WebhookModule, // <-- add this +// GasAlertModule, +// BullModule.forRootAsync({ +// useFactory: (config: ConfigService) => ({ +// connection: { +// host: config.get('REDIS_HOST', 'localhost'), +// port: config.get('REDIS_PORT', 6379), +// }, +// }), +// inject: [ConfigService], +// }), +// ], +// }) +// export class AppModule {} + +// ── Required environment variables ─────────────────────────────────────────── +// +// REDIS_HOST=localhost +// REDIS_PORT=6379 +// WEBHOOK_ADMIN_SECRET= # guards POST /webhooks/dispatch diff --git a/apps/api/src/webhook-support/webhook-admin.guard.spec.ts b/apps/api/src/webhook-support/webhook-admin.guard.spec.ts new file mode 100644 index 0000000..e0d69bb --- /dev/null +++ b/apps/api/src/webhook-support/webhook-admin.guard.spec.ts @@ -0,0 +1,56 @@ +import { ExecutionContext, UnauthorizedException } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { WebhookAdminGuard } from '../guards/webhook-admin.guard'; + +const makeContext = (authHeader?: string): ExecutionContext => + ({ + switchToHttp: () => ({ + getRequest: () => ({ + headers: authHeader ? { authorization: authHeader } : {}, + }), + }), + } as unknown as ExecutionContext); + +const makeGuard = (secret = 'admin-secret-key') => { + const config = { get: jest.fn().mockReturnValue(secret) } as unknown as ConfigService; + return new WebhookAdminGuard(config); +}; + +describe('WebhookAdminGuard', () => { + it('allows a request with the correct Bearer token', () => { + const guard = makeGuard('my-admin-secret'); + expect(guard.canActivate(makeContext('Bearer my-admin-secret'))).toBe(true); + }); + + it('throws when the Authorization header is missing', () => { + expect(() => makeGuard().canActivate(makeContext())).toThrow(UnauthorizedException); + }); + + it('throws when the scheme is not Bearer', () => { + expect(() => makeGuard().canActivate(makeContext('Basic dXNlcjpwYXNz'))).toThrow( + UnauthorizedException, + ); + }); + + it('throws when the token does not match', () => { + const guard = makeGuard('correct-secret'); + expect(() => guard.canActivate(makeContext('Bearer wrong-token'))).toThrow( + UnauthorizedException, + ); + }); + + it('throws when WEBHOOK_ADMIN_SECRET is not set', () => { + const config = { get: jest.fn().mockReturnValue('') } as unknown as ConfigService; + const guard = new WebhookAdminGuard(config); + expect(() => guard.canActivate(makeContext('Bearer anything'))).toThrow( + UnauthorizedException, + ); + }); + + it('handles tokens of different lengths without throwing (timing-safe)', () => { + const guard = makeGuard('short'); + expect(() => + guard.canActivate(makeContext('Bearer this-is-a-much-longer-token-than-expected')), + ).toThrow(UnauthorizedException); + }); +}); diff --git a/apps/api/src/webhook-support/webhook-admin.guard.ts b/apps/api/src/webhook-support/webhook-admin.guard.ts new file mode 100644 index 0000000..98325f2 --- /dev/null +++ b/apps/api/src/webhook-support/webhook-admin.guard.ts @@ -0,0 +1,59 @@ +import { + CanActivate, + ExecutionContext, + Injectable, + UnauthorizedException, +} from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Request } from 'express'; +import { timingSafeEqual } from 'crypto'; + +/** + * WebhookAdminGuard + * ───────────────── + * Protects the internal /webhooks/dispatch endpoint (and any other admin + * webhook routes) behind a pre-shared API key. + * + * The key is read from the environment variable WEBHOOK_ADMIN_SECRET. + * Consumers must supply it as: + * + * Authorization: Bearer + * + * Usage — apply per route: + * @UseGuards(WebhookAdminGuard) + * @Post('dispatch') + * dispatch(@Body() dto: DispatchEventDto) { ... } + * + * Or globally in AppModule / a dedicated admin module guard. + */ +@Injectable() +export class WebhookAdminGuard implements CanActivate { + constructor(private readonly config: ConfigService) {} + + canActivate(context: ExecutionContext): boolean { + const request = context.switchToHttp().getRequest(); + const authHeader = request.headers['authorization'] ?? ''; + + if (!authHeader.startsWith('Bearer ')) { + throw new UnauthorizedException('Missing admin Bearer token'); + } + + const token = authHeader.slice(7); + const expected = this.config.get('WEBHOOK_ADMIN_SECRET', ''); + + if (!expected) { + throw new UnauthorizedException('WEBHOOK_ADMIN_SECRET is not configured'); + } + + let match = false; + try { + match = timingSafeEqual(Buffer.from(token), Buffer.from(expected)); + } catch { + // Buffers differ in length + match = false; + } + + if (!match) throw new UnauthorizedException('Invalid admin token'); + return true; + } +} diff --git a/apps/api/src/webhook-support/webhook-delivery.service.spec.ts b/apps/api/src/webhook-support/webhook-delivery.service.spec.ts new file mode 100644 index 0000000..50b3b79 --- /dev/null +++ b/apps/api/src/webhook-support/webhook-delivery.service.spec.ts @@ -0,0 +1,167 @@ +import { HttpService } from '@nestjs/axios'; +import { Test, TestingModule } from '@nestjs/testing'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import { AxiosResponse } from 'axios'; +import { of, throwError } from 'rxjs'; +import { WebhookDelivery } from '../entities/webhook-delivery.entity'; +import { DeliveryStatus } from '../enums/delivery-status.enum'; +import { WebhookEvent } from '../enums/webhook-event.enum'; +import { WebhookDeliveryJobData } from '../interfaces/webhook-payload.interface'; +import { WebhookDeliveryService } from '../webhook-delivery.service'; +import { WebhookSignatureService } from '../webhook-signature.service'; + +const makeJob = (overrides: Partial = {}): WebhookDeliveryJobData => ({ + webhookId: 'wh-1', + deliveryId: 'del-1', + url: 'https://consumer.example.com/hook', + secret: 'super-secret-key-16chars', + attempt: 0, + payload: { + id: 'payload-uuid', + event: WebhookEvent.PING, + createdAt: new Date().toISOString(), + data: {}, + }, + ...overrides, +}); + +const makeAxiosResponse = (status: number, data: unknown = {}): AxiosResponse => + ({ + status, + data, + headers: {}, + config: { headers: {} } as any, + statusText: String(status), + } as AxiosResponse); + +describe('WebhookDeliveryService', () => { + let service: WebhookDeliveryService; + let httpService: jest.Mocked; + let deliveryRepo: { update: jest.Mock }; + let signatureService: WebhookSignatureService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + WebhookDeliveryService, + WebhookSignatureService, + { + provide: HttpService, + useValue: { post: jest.fn() }, + }, + { + provide: getRepositoryToken(WebhookDelivery), + useValue: { update: jest.fn().mockResolvedValue(undefined) }, + }, + ], + }).compile(); + + service = module.get(WebhookDeliveryService); + httpService = module.get(HttpService); + deliveryRepo = module.get(getRepositoryToken(WebhookDelivery)); + signatureService = module.get(WebhookSignatureService); + }); + + afterEach(() => jest.clearAllMocks()); + + // ── successful delivery ─────────────────────────────────────────────────── + + describe('deliver — success path', () => { + it('sends POST with correct headers and marks delivery as SUCCESS', async () => { + const job = makeJob(); + httpService.post.mockReturnValue(of(makeAxiosResponse(200, { ok: true }))); + + await service.deliver(job); + + expect(httpService.post).toHaveBeenCalledWith( + job.url, + job.payload, + expect.objectContaining({ + headers: expect.objectContaining({ + 'Content-Type': 'application/json', + 'X-BridgeWise-Delivery': 'del-1', + 'X-BridgeWise-Event': WebhookEvent.PING, + }), + }), + ); + + expect(deliveryRepo.update).toHaveBeenLastCalledWith( + 'del-1', + expect.objectContaining({ status: DeliveryStatus.SUCCESS }), + ); + }); + + it('includes a valid HMAC-SHA256 signature header', async () => { + const job = makeJob(); + httpService.post.mockReturnValue(of(makeAxiosResponse(201))); + + await service.deliver(job); + + const callArgs = httpService.post.mock.calls[0]; + const headers = callArgs[2].headers as Record; + const sig = headers['X-BridgeWise-Signature']; + + const expectedSig = signatureService.sign( + JSON.stringify(job.payload), + job.secret, + ); + expect(sig).toBe(expectedSig); + }); + + it('accepts any 2xx status code', async () => { + for (const status of [200, 201, 202, 204]) { + deliveryRepo.update.mockResolvedValue(undefined); + httpService.post.mockReturnValue(of(makeAxiosResponse(status))); + await expect(service.deliver(makeJob())).resolves.not.toThrow(); + } + }); + }); + + // ── failed delivery (HTTP error) ────────────────────────────────────────── + + describe('deliver — HTTP error path', () => { + it('throws and marks delivery FAILED on non-2xx response', async () => { + const job = makeJob(); + httpService.post.mockReturnValue(of(makeAxiosResponse(500, 'Internal Server Error'))); + + await expect(service.deliver(job)).rejects.toThrow(/500/); + + expect(deliveryRepo.update).toHaveBeenLastCalledWith( + 'del-1', + expect.objectContaining({ status: DeliveryStatus.FAILED, responseStatus: 500 }), + ); + }); + + it('throws and records error message on network failure', async () => { + const job = makeJob(); + const networkError = Object.assign(new Error('ECONNREFUSED'), { isAxiosError: true }); + httpService.post.mockReturnValue(throwError(() => networkError)); + + await expect(service.deliver(job)).rejects.toThrow('ECONNREFUSED'); + + expect(deliveryRepo.update).toHaveBeenLastCalledWith( + 'del-1', + expect.objectContaining({ + status: DeliveryStatus.FAILED, + errorMessage: expect.stringContaining('ECONNREFUSED'), + }), + ); + }); + }); + + // ── markExhausted ───────────────────────────────────────────────────────── + + describe('markExhausted', () => { + it('sets status to EXHAUSTED with error summary', async () => { + await service.markExhausted('del-1', 'connection refused'); + + expect(deliveryRepo.update).toHaveBeenCalledWith( + 'del-1', + expect.objectContaining({ + status: DeliveryStatus.EXHAUSTED, + errorMessage: expect.stringContaining('connection refused'), + }), + ); + }); + }); +}); diff --git a/apps/api/src/webhook-support/webhook-delivery.service.ts b/apps/api/src/webhook-support/webhook-delivery.service.ts new file mode 100644 index 0000000..97f02d1 --- /dev/null +++ b/apps/api/src/webhook-support/webhook-delivery.service.ts @@ -0,0 +1,109 @@ +import { HttpService } from '@nestjs/axios'; +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { AxiosError } from 'axios'; +import { firstValueFrom, timeout } from 'rxjs'; +import { Repository } from 'typeorm'; +import { WebhookDelivery } from './entities/webhook-delivery.entity'; +import { DeliveryStatus } from './enums/delivery-status.enum'; +import { WebhookDeliveryJobData } from './interfaces/webhook-payload.interface'; +import { WebhookSignatureService } from './webhook-signature.service'; + +const DELIVERY_TIMEOUT_MS = 10_000; // 10 s +const MAX_RESPONSE_BODY = 2_048; // 2 KiB stored + +@Injectable() +export class WebhookDeliveryService { + private readonly logger = new Logger(WebhookDeliveryService.name); + + constructor( + private readonly httpService: HttpService, + private readonly signatureService: WebhookSignatureService, + @InjectRepository(WebhookDelivery) + private readonly deliveryRepo: Repository, + ) {} + + /** + * Executes one HTTP delivery attempt. + * Throws on failure so BullMQ can handle retries / backoff. + */ + async deliver(job: WebhookDeliveryJobData): Promise { + const { deliveryId, url, secret, payload, attempt } = job; + + const body = JSON.stringify(payload); + const signature = this.signatureService.sign(body, secret); + + this.logger.log( + `Attempting delivery ${deliveryId} → ${url} (attempt #${attempt + 1})`, + ); + + await this.deliveryRepo.update(deliveryId, { + status: DeliveryStatus.RETRYING, + attempt: attempt + 1, + }); + + try { + const response = await firstValueFrom( + this.httpService + .post(url, payload, { + headers: { + 'Content-Type': 'application/json', + 'X-BridgeWise-Signature': signature, + 'X-BridgeWise-Event': payload.event, + 'X-BridgeWise-Delivery': deliveryId, + }, + validateStatus: () => true, // handle all statuses ourselves + }) + .pipe(timeout(DELIVERY_TIMEOUT_MS)), + ); + + const responseBody = + typeof response.data === 'string' + ? response.data.slice(0, MAX_RESPONSE_BODY) + : JSON.stringify(response.data).slice(0, MAX_RESPONSE_BODY); + + const success = response.status >= 200 && response.status < 300; + + await this.deliveryRepo.update(deliveryId, { + status: success ? DeliveryStatus.SUCCESS : DeliveryStatus.FAILED, + responseStatus: response.status, + responseBody, + deliveredAt: success ? new Date() : null, + errorMessage: success + ? null + : `HTTP ${response.status}: ${responseBody}`, + }); + + if (!success) { + throw new Error( + `Webhook consumer returned ${response.status} for delivery ${deliveryId}`, + ); + } + + this.logger.log(`Delivery ${deliveryId} succeeded (HTTP ${response.status})`); + } catch (error) { + const isAxiosError = (error as AxiosError).isAxiosError; + const message = isAxiosError + ? `Network error: ${(error as AxiosError).message}` + : (error as Error).message; + + this.logger.warn(`Delivery ${deliveryId} failed: ${message}`); + + await this.deliveryRepo.update(deliveryId, { + status: DeliveryStatus.FAILED, + errorMessage: message.slice(0, 1_000), + }); + + throw error; // re-throw → BullMQ retries + } + } + + /** Mark a delivery as permanently exhausted (called by processor on final fail) */ + async markExhausted(deliveryId: string, errorMessage: string): Promise { + await this.deliveryRepo.update(deliveryId, { + status: DeliveryStatus.EXHAUSTED, + errorMessage: `Exhausted all retries. Last error: ${errorMessage}`.slice(0, 1_000), + }); + this.logger.error(`Delivery ${deliveryId} exhausted all retry attempts`); + } +} diff --git a/apps/api/src/webhook-support/webhook-signature.service.spec.ts b/apps/api/src/webhook-support/webhook-signature.service.spec.ts new file mode 100644 index 0000000..d8e88e8 --- /dev/null +++ b/apps/api/src/webhook-support/webhook-signature.service.spec.ts @@ -0,0 +1,65 @@ +import { WebhookSignatureService } from '../webhook-signature.service'; + +describe('WebhookSignatureService', () => { + let service: WebhookSignatureService; + + beforeEach(() => { + service = new WebhookSignatureService(); + }); + + describe('sign', () => { + it('returns a sha256= prefixed hex string', () => { + const sig = service.sign('{"event":"ping"}', 'my-secret'); + expect(sig).toMatch(/^sha256=[0-9a-f]{64}$/); + }); + + it('produces deterministic output for the same inputs', () => { + const a = service.sign('payload', 'secret'); + const b = service.sign('payload', 'secret'); + expect(a).toBe(b); + }); + + it('produces different output for different payloads', () => { + const a = service.sign('payload-A', 'secret'); + const b = service.sign('payload-B', 'secret'); + expect(a).not.toBe(b); + }); + + it('produces different output for different secrets', () => { + const a = service.sign('payload', 'secret-A'); + const b = service.sign('payload', 'secret-B'); + expect(a).not.toBe(b); + }); + }); + + describe('verify', () => { + it('returns true for a valid signature', () => { + const payload = JSON.stringify({ event: 'gas.spike_detected' }); + const secret = 'super-secret-key-16chars'; + const sig = service.sign(payload, secret); + expect(service.verify(payload, secret, sig)).toBe(true); + }); + + it('returns false for a tampered payload', () => { + const secret = 'super-secret-key-16chars'; + const sig = service.sign('original', secret); + expect(service.verify('tampered', secret, sig)).toBe(false); + }); + + it('returns false for a wrong secret', () => { + const payload = 'payload'; + const sig = service.sign(payload, 'correct-secret-16c'); + expect(service.verify(payload, 'wrong-secret-16ch', sig)).toBe(false); + }); + + it('returns false for an empty signature string', () => { + expect(service.verify('payload', 'secret', '')).toBe(false); + }); + + it('is not vulnerable to basic timing oracle (different length strings)', () => { + // timingSafeEqual would throw if lengths differ — verify() must not throw + expect(() => service.verify('payload', 'secret', 'tooshort')).not.toThrow(); + expect(service.verify('payload', 'secret', 'tooshort')).toBe(false); + }); + }); +}); diff --git a/apps/api/src/webhook-support/webhook-signature.service.ts b/apps/api/src/webhook-support/webhook-signature.service.ts new file mode 100644 index 0000000..7611149 --- /dev/null +++ b/apps/api/src/webhook-support/webhook-signature.service.ts @@ -0,0 +1,34 @@ +import { Injectable } from '@nestjs/common'; +import { createHmac, timingSafeEqual } from 'crypto'; + +@Injectable() +export class WebhookSignatureService { + private static readonly ALGORITHM = 'sha256'; + private static readonly HEADER_PREFIX = 'sha256='; + + /** + * Generates an HMAC-SHA256 signature for the given payload using the secret. + * Signature format: sha256= + */ + sign(payload: string, secret: string): string { + const digest = createHmac(WebhookSignatureService.ALGORITHM, secret) + .update(payload, 'utf8') + .digest('hex'); + + return `${WebhookSignatureService.HEADER_PREFIX}${digest}`; + } + + /** + * Constant-time comparison to prevent timing attacks. + */ + verify(payload: string, secret: string, signature: string): boolean { + const expected = this.sign(payload, secret); + + try { + return timingSafeEqual(Buffer.from(expected), Buffer.from(signature)); + } catch { + // Buffers differ in length → not equal + return false; + } + } +} diff --git a/apps/api/src/webhook-support/webhook.controller.spec.ts b/apps/api/src/webhook-support/webhook.controller.spec.ts new file mode 100644 index 0000000..408683d --- /dev/null +++ b/apps/api/src/webhook-support/webhook.controller.spec.ts @@ -0,0 +1,125 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { Webhook } from '../entities/webhook.entity'; +import { DeliveryStatus } from '../enums/delivery-status.enum'; +import { WebhookEvent } from '../enums/webhook-event.enum'; +import { WebhookController } from '../webhook.controller'; +import { WebhookService } from '../webhook.service'; + +const mockWebhookService = () => ({ + create: jest.fn(), + findAll: jest.fn(), + findOne: jest.fn(), + update: jest.fn(), + remove: jest.fn(), + ping: jest.fn(), + dispatch: jest.fn(), + listDeliveries: jest.fn(), + getDelivery: jest.fn(), + retryDelivery: jest.fn(), +}); + +const stubWebhook: Partial = { + id: 'wh-uuid-1', + name: 'Test Hook', + url: 'https://example.com/hook', + isActive: true, +}; + +describe('WebhookController', () => { + let controller: WebhookController; + let service: ReturnType; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + controllers: [WebhookController], + providers: [{ provide: WebhookService, useFactory: mockWebhookService }], + }).compile(); + + controller = module.get(WebhookController); + service = module.get(WebhookService); + }); + + afterEach(() => jest.clearAllMocks()); + + it('create — delegates to service and returns result', async () => { + service.create.mockResolvedValue(stubWebhook); + const dto = { + name: 'Test Hook', + url: 'https://example.com/hook', + secret: 'my-secret-key-16chars', + }; + expect(await controller.create(dto)).toBe(stubWebhook); + expect(service.create).toHaveBeenCalledWith(dto); + }); + + it('findAll — returns list from service', async () => { + service.findAll.mockResolvedValue([stubWebhook]); + expect(await controller.findAll()).toEqual([stubWebhook]); + }); + + it('findOne — passes id to service', async () => { + service.findOne.mockResolvedValue(stubWebhook); + expect(await controller.findOne('wh-uuid-1')).toBe(stubWebhook); + expect(service.findOne).toHaveBeenCalledWith('wh-uuid-1'); + }); + + it('update — merges dto and returns updated entity', async () => { + const updated = { ...stubWebhook, name: 'Renamed' }; + service.update.mockResolvedValue(updated); + const result = await controller.update('wh-uuid-1', { name: 'Renamed' }); + expect(result).toEqual(updated); + expect(service.update).toHaveBeenCalledWith('wh-uuid-1', { name: 'Renamed' }); + }); + + it('remove — calls service.remove', async () => { + service.remove.mockResolvedValue(undefined); + await controller.remove('wh-uuid-1'); + expect(service.remove).toHaveBeenCalledWith('wh-uuid-1'); + }); + + it('ping — calls service.ping with id', async () => { + service.ping.mockResolvedValue(undefined); + await controller.ping('wh-uuid-1'); + expect(service.ping).toHaveBeenCalledWith('wh-uuid-1'); + }); + + it('dispatch — passes dto to service', async () => { + service.dispatch.mockResolvedValue(undefined); + const dto = { event: WebhookEvent.GAS_SPIKE_DETECTED, data: { chain: 'eth' } }; + await controller.dispatch(dto); + expect(service.dispatch).toHaveBeenCalledWith(dto); + }); + + it('listDeliveries — returns paginated result', async () => { + const result = { + items: [ + { + id: 'del-1', + webhookId: 'wh-uuid-1', + event: WebhookEvent.PING, + status: DeliveryStatus.SUCCESS, + }, + ], + total: 1, + }; + service.listDeliveries.mockResolvedValue(result); + + const response = await controller.listDeliveries('wh-uuid-1', { page: 1, limit: 20 }); + + expect(response).toEqual(result); + expect(service.listDeliveries).toHaveBeenCalledWith('wh-uuid-1', { page: 1, limit: 20 }); + }); + + it('getDelivery — returns single delivery record', async () => { + const delivery = { id: 'del-1', status: DeliveryStatus.SUCCESS }; + service.getDelivery.mockResolvedValue(delivery); + + expect(await controller.getDelivery('wh-uuid-1', 'del-1')).toBe(delivery); + }); + + it('retryDelivery — delegates retry to service', async () => { + service.retryDelivery.mockResolvedValue(undefined); + await controller.retryDelivery('wh-uuid-1', 'del-1'); + expect(service.retryDelivery).toHaveBeenCalledWith('wh-uuid-1', 'del-1'); + }); +}); diff --git a/apps/api/src/webhook-support/webhook.controller.ts b/apps/api/src/webhook-support/webhook.controller.ts new file mode 100644 index 0000000..516222c --- /dev/null +++ b/apps/api/src/webhook-support/webhook.controller.ts @@ -0,0 +1,132 @@ +import { + Body, + Controller, + Delete, + Get, + HttpCode, + HttpStatus, + Param, + ParseUUIDPipe, + Patch, + Post, + Query, + UseGuards, +} from '@nestjs/common'; +import { WebhookAdminGuard } from './guards/webhook-admin.guard'; +import { + CreateWebhookDto, + DispatchEventDto, + ListDeliveriesQueryDto, + UpdateWebhookDto, +} from './dto/webhook.dto'; +import { WebhookDelivery } from './entities/webhook-delivery.entity'; +import { Webhook } from './entities/webhook.entity'; +import { WebhookService } from './webhook.service'; + +/** + * @tag Webhooks + * + * Endpoints + * ───────────────────────────────────────────────────────────────────────────── + * POST /webhooks Register a new webhook + * GET /webhooks List all registered webhooks + * GET /webhooks/:id Get a single webhook + * PATCH /webhooks/:id Update a webhook + * DELETE /webhooks/:id Remove a webhook + * POST /webhooks/:id/ping Send a PING test event + * GET /webhooks/:id/deliveries List delivery attempts + * GET /webhooks/:id/deliveries/:dId Get one delivery record + * POST /webhooks/:id/deliveries/:dId/retry Re-queue a failed delivery + * POST /webhooks/dispatch Dispatch an event (admin / internal) + */ +@Controller('webhooks') +export class WebhookController { + constructor(private readonly webhookService: WebhookService) {} + + // ────────────────────────────────────────────────────────────────────────── + // Webhook CRUD + // ────────────────────────────────────────────────────────────────────────── + + @Post() + create(@Body() dto: CreateWebhookDto): Promise { + return this.webhookService.create(dto); + } + + @Get() + findAll(): Promise { + return this.webhookService.findAll(); + } + + @Get(':id') + findOne(@Param('id', ParseUUIDPipe) id: string): Promise { + return this.webhookService.findOne(id); + } + + @Patch(':id') + update( + @Param('id', ParseUUIDPipe) id: string, + @Body() dto: UpdateWebhookDto, + ): Promise { + return this.webhookService.update(id, dto); + } + + @Delete(':id') + @HttpCode(HttpStatus.NO_CONTENT) + remove(@Param('id', ParseUUIDPipe) id: string): Promise { + return this.webhookService.remove(id); + } + + // ────────────────────────────────────────────────────────────────────────── + // Ping / test connectivity + // ────────────────────────────────────────────────────────────────────────── + + @Post(':id/ping') + @HttpCode(HttpStatus.ACCEPTED) + ping(@Param('id', ParseUUIDPipe) id: string): Promise { + return this.webhookService.ping(id); + } + + // ────────────────────────────────────────────────────────────────────────── + // Delivery logs + // ────────────────────────────────────────────────────────────────────────── + + @Get(':id/deliveries') + listDeliveries( + @Param('id', ParseUUIDPipe) id: string, + @Query() query: ListDeliveriesQueryDto, + ): Promise<{ items: WebhookDelivery[]; total: number }> { + return this.webhookService.listDeliveries(id, query); + } + + @Get(':id/deliveries/:dId') + getDelivery( + @Param('id', ParseUUIDPipe) id: string, + @Param('dId', ParseUUIDPipe) dId: string, + ): Promise { + return this.webhookService.getDelivery(id, dId); + } + + @Post(':id/deliveries/:dId/retry') + @HttpCode(HttpStatus.ACCEPTED) + retryDelivery( + @Param('id', ParseUUIDPipe) id: string, + @Param('dId', ParseUUIDPipe) dId: string, + ): Promise { + return this.webhookService.retryDelivery(id, dId); + } + + // ────────────────────────────────────────────────────────────────────────── + // Internal / Admin dispatch + // ────────────────────────────────────────────────────────────────────────── + + /** + * Triggers an event broadcast to all active, subscribed webhooks. + * Protected by WebhookAdminGuard — requires `Authorization: Bearer `. + */ + @UseGuards(WebhookAdminGuard) + @Post('dispatch') + @HttpCode(HttpStatus.ACCEPTED) + dispatch(@Body() dto: DispatchEventDto): Promise { + return this.webhookService.dispatch(dto); + } +} diff --git a/apps/api/src/webhook-support/webhook.e2e.spec.ts b/apps/api/src/webhook-support/webhook.e2e.spec.ts new file mode 100644 index 0000000..1f0e3c3 --- /dev/null +++ b/apps/api/src/webhook-support/webhook.e2e.spec.ts @@ -0,0 +1,276 @@ +/** + * Webhook Module — E2E Integration Test + * + * Uses an in-memory SQLite database (better-sqlite3) so no real Postgres + * or Redis is required during CI. BullMQ is mocked at the module level. + */ +import { HttpService } from '@nestjs/axios'; +import { BullModule, getQueueToken } from '@nestjs/bullmq'; +import { INestApplication, ValidationPipe } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { Test, TestingModule } from '@nestjs/testing'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { of } from 'rxjs'; +import * as request from 'supertest'; +import { WebhookDelivery } from '../entities/webhook-delivery.entity'; +import { Webhook } from '../entities/webhook.entity'; +import { WebhookEvent } from '../enums/webhook-event.enum'; +import { WEBHOOK_QUEUE } from '../webhook.constants'; +import { WebhookModule } from '../webhook.module'; + +// ── queue stub ──────────────────────────────────────────────────────────────── +const mockQueue = { add: jest.fn().mockResolvedValue({ id: 'job-1' }) }; + +// ── http stub ───────────────────────────────────────────────────────────────── +const mockHttpService = { + post: jest.fn(() => + of({ status: 200, data: { ok: true }, headers: {}, config: { headers: {} } }), + ), +}; + +// ───────────────────────────────────────────────────────────────────────────── + +describe('Webhook Module (e2e)', () => { + let app: INestApplication; + let createdWebhookId: string; + let createdDeliveryId: string; + + beforeAll(async () => { + const moduleRef: TestingModule = await Test.createTestingModule({ + imports: [ + ConfigModule.forRoot({ isGlobal: true, ignoreEnvFile: true }), + + // SQLite in-memory — no Docker needed + TypeOrmModule.forRoot({ + type: 'better-sqlite3', + database: ':memory:', + entities: [Webhook, WebhookDelivery], + synchronize: true, + logging: false, + }), + + // Register queue so the module compiles; we override the provider below + BullModule.forRoot({ connection: { host: 'localhost' } }), + BullModule.registerQueue({ name: WEBHOOK_QUEUE }), + + WebhookModule, + ], + }) + .overrideProvider(getQueueToken(WEBHOOK_QUEUE)) + .useValue(mockQueue) + .overrideProvider(HttpService) + .useValue(mockHttpService) + .compile(); + + app = moduleRef.createNestApplication(); + app.useGlobalPipes( + new ValidationPipe({ whitelist: true, forbidNonWhitelisted: true }), + ); + await app.init(); + }); + + afterAll(async () => { + await app.close(); + }); + + beforeEach(() => jest.clearAllMocks()); + + // ── POST /webhooks ───────────────────────────────────────────────────────── + + describe('POST /webhooks', () => { + it('201 — creates a webhook with valid payload', async () => { + const res = await request(app.getHttpServer()) + .post('/webhooks') + .send({ + name: 'Gas Alert Hook', + url: 'https://consumer.example.com/hook', + secret: 'a-very-secure-secret-key', + events: [WebhookEvent.GAS_SPIKE_DETECTED], + }) + .expect(201); + + expect(res.body).toMatchObject({ + id: expect.any(String), + name: 'Gas Alert Hook', + url: 'https://consumer.example.com/hook', + isActive: true, + maxRetries: 5, + }); + + createdWebhookId = res.body.id; + }); + + it('400 — rejects missing required fields', async () => { + await request(app.getHttpServer()) + .post('/webhooks') + .send({ name: 'No URL' }) + .expect(400); + }); + + it('400 — rejects secrets shorter than 16 characters', async () => { + await request(app.getHttpServer()) + .post('/webhooks') + .send({ + name: 'Short secret', + url: 'https://example.com/hook', + secret: 'tooshort', + }) + .expect(400); + }); + + it('400 — rejects unknown event types', async () => { + await request(app.getHttpServer()) + .post('/webhooks') + .send({ + name: 'Bad event', + url: 'https://example.com/hook', + secret: 'a-very-secure-secret-key', + events: ['not.a.real.event'], + }) + .expect(400); + }); + }); + + // ── GET /webhooks ────────────────────────────────────────────────────────── + + describe('GET /webhooks', () => { + it('200 — returns array of webhooks', async () => { + const res = await request(app.getHttpServer()).get('/webhooks').expect(200); + expect(Array.isArray(res.body)).toBe(true); + expect(res.body.length).toBeGreaterThanOrEqual(1); + }); + }); + + // ── GET /webhooks/:id ────────────────────────────────────────────────────── + + describe('GET /webhooks/:id', () => { + it('200 — returns the specific webhook', async () => { + const res = await request(app.getHttpServer()) + .get(`/webhooks/${createdWebhookId}`) + .expect(200); + + expect(res.body.id).toBe(createdWebhookId); + }); + + it('404 — unknown id', async () => { + await request(app.getHttpServer()) + .get('/webhooks/00000000-0000-0000-0000-000000000000') + .expect(404); + }); + }); + + // ── PATCH /webhooks/:id ──────────────────────────────────────────────────── + + describe('PATCH /webhooks/:id', () => { + it('200 — updates webhook name', async () => { + const res = await request(app.getHttpServer()) + .patch(`/webhooks/${createdWebhookId}`) + .send({ name: 'Renamed Hook' }) + .expect(200); + + expect(res.body.name).toBe('Renamed Hook'); + }); + }); + + // ── POST /webhooks/:id/ping ──────────────────────────────────────────────── + + describe('POST /webhooks/:id/ping', () => { + it('202 — queues a PING delivery', async () => { + await request(app.getHttpServer()) + .post(`/webhooks/${createdWebhookId}/ping`) + .expect(202); + + expect(mockQueue.add).toHaveBeenCalledWith( + 'deliver', + expect.objectContaining({ + webhookId: createdWebhookId, + payload: expect.objectContaining({ event: WebhookEvent.PING }), + }), + expect.any(Object), + ); + }); + }); + + // ── GET /webhooks/:id/deliveries ─────────────────────────────────────────── + + describe('GET /webhooks/:id/deliveries', () => { + it('200 — returns paginated delivery records', async () => { + const res = await request(app.getHttpServer()) + .get(`/webhooks/${createdWebhookId}/deliveries`) + .expect(200); + + expect(res.body).toMatchObject({ items: expect.any(Array), total: expect.any(Number) }); + if (res.body.items.length > 0) { + createdDeliveryId = res.body.items[0].id; + } + }); + + it('404 — unknown webhook id', async () => { + await request(app.getHttpServer()) + .get('/webhooks/00000000-0000-0000-0000-000000000000/deliveries') + .expect(404); + }); + }); + + // ── GET /webhooks/:id/deliveries/:dId ───────────────────────────────────── + + describe('GET /webhooks/:id/deliveries/:dId', () => { + it('200 — returns single delivery when it exists', async () => { + if (!createdDeliveryId) return; // no delivery created yet + const res = await request(app.getHttpServer()) + .get(`/webhooks/${createdWebhookId}/deliveries/${createdDeliveryId}`) + .expect(200); + + expect(res.body.id).toBe(createdDeliveryId); + }); + }); + + // ── POST /webhooks/dispatch ──────────────────────────────────────────────── + + describe('POST /webhooks/dispatch', () => { + beforeEach(() => { + // Set the admin secret in env so the guard passes + process.env.WEBHOOK_ADMIN_SECRET = 'admin-secret-key'; + }); + + it('202 — dispatches event to subscribed active webhooks', async () => { + await request(app.getHttpServer()) + .post('/webhooks/dispatch') + .set('Authorization', 'Bearer admin-secret-key') + .send({ event: WebhookEvent.GAS_SPIKE_DETECTED, data: { chain: 'eth' } }) + .expect(202); + + // Our hook subscribes to GAS_SPIKE_DETECTED → exactly 1 delivery queued + expect(mockQueue.add).toHaveBeenCalledWith( + 'deliver', + expect.objectContaining({ + payload: expect.objectContaining({ event: WebhookEvent.GAS_SPIKE_DETECTED }), + }), + expect.any(Object), + ); + }); + + it('401 — dispatch is rejected without admin token', async () => { + await request(app.getHttpServer()) + .post('/webhooks/dispatch') + .send({ event: WebhookEvent.PING }) + .expect(401); + }); + }); + + // ── DELETE /webhooks/:id ─────────────────────────────────────────────────── + + describe('DELETE /webhooks/:id', () => { + it('204 — removes the webhook', async () => { + await request(app.getHttpServer()) + .delete(`/webhooks/${createdWebhookId}`) + .expect(204); + }); + + it('404 — webhook no longer exists after deletion', async () => { + await request(app.getHttpServer()) + .get(`/webhooks/${createdWebhookId}`) + .expect(404); + }); + }); +}); diff --git a/apps/api/src/webhook-support/webhook.module.ts b/apps/api/src/webhook-support/webhook.module.ts new file mode 100644 index 0000000..a6f413e --- /dev/null +++ b/apps/api/src/webhook-support/webhook.module.ts @@ -0,0 +1,48 @@ +import { HttpModule } from '@nestjs/axios'; +import { BullModule } from '@nestjs/bullmq'; +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { WebhookDelivery } from './entities/webhook-delivery.entity'; +import { Webhook } from './entities/webhook.entity'; +import { WebhookController } from './webhook.controller'; +import { WEBHOOK_QUEUE } from './webhook.constants'; +import { WebhookDeliveryService } from './webhook-delivery.service'; +import { WebhookProcessor } from './webhook.processor'; +import { WebhookService } from './webhook.service'; +import { WebhookSignatureService } from './webhook-signature.service'; + +@Module({ + imports: [ + TypeOrmModule.forFeature([Webhook, WebhookDelivery]), + + BullModule.registerQueue({ + name: WEBHOOK_QUEUE, + defaultJobOptions: { + attempts: 5, + backoff: { type: 'exponential', delay: 5_000 }, + removeOnComplete: { age: 86_400 }, + removeOnFail: false, + }, + }), + + HttpModule.register({ + timeout: 10_000, + maxRedirects: 3, + }), + ], + + controllers: [WebhookController], + + providers: [ + WebhookService, + WebhookDeliveryService, + WebhookSignatureService, + WebhookProcessor, + ], + + exports: [ + WebhookService, // expose dispatch() to other modules + WebhookSignatureService, + ], +}) +export class WebhookModule {} diff --git a/apps/api/src/webhook-support/webhook.processor.spec.ts b/apps/api/src/webhook-support/webhook.processor.spec.ts new file mode 100644 index 0000000..ddc390c --- /dev/null +++ b/apps/api/src/webhook-support/webhook.processor.spec.ts @@ -0,0 +1,93 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { Job } from 'bullmq'; +import { WebhookEvent } from '../enums/webhook-event.enum'; +import { WebhookDeliveryJobData } from '../interfaces/webhook-payload.interface'; +import { WebhookDeliveryService } from '../webhook-delivery.service'; +import { WebhookProcessor } from '../webhook.processor'; + +const makeJob = (overrides: Partial> = {}) => + ({ + id: 'job-uuid-1', + name: 'deliver', + attemptsMade: 0, + opts: { attempts: 5 }, + data: { + webhookId: 'wh-1', + deliveryId: 'del-1', + url: 'https://consumer.example.com/hook', + secret: 'secret-key-16chars', + attempt: 0, + payload: { + id: 'payload-uuid', + event: WebhookEvent.PING, + createdAt: new Date().toISOString(), + data: {}, + }, + }, + ...overrides, + } as unknown as Job); + +describe('WebhookProcessor', () => { + let processor: WebhookProcessor; + let deliveryService: jest.Mocked; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + WebhookProcessor, + { + provide: WebhookDeliveryService, + useValue: { + deliver: jest.fn().mockResolvedValue(undefined), + markExhausted: jest.fn().mockResolvedValue(undefined), + }, + }, + ], + }).compile(); + + processor = module.get(WebhookProcessor); + deliveryService = module.get(WebhookDeliveryService); + }); + + afterEach(() => jest.clearAllMocks()); + + // ── process ─────────────────────────────────────────────────────────────── + + describe('process', () => { + it('calls deliveryService.deliver with job data + current attempt count', async () => { + const job = makeJob({ attemptsMade: 2 }); + await processor.process(job); + + expect(deliveryService.deliver).toHaveBeenCalledWith( + expect.objectContaining({ attempt: 2 }), + ); + }); + + it('propagates errors from deliveryService (so BullMQ can retry)', async () => { + deliveryService.deliver.mockRejectedValue(new Error('Network timeout')); + const job = makeJob(); + await expect(processor.process(job)).rejects.toThrow('Network timeout'); + }); + }); + + // ── onFailed ────────────────────────────────────────────────────────────── + + describe('onFailed', () => { + it('calls markExhausted when this is the final attempt', async () => { + const job = makeJob({ attemptsMade: 5, opts: { attempts: 5 } }); + await processor.onFailed(job, new Error('deadline exceeded')); + + expect(deliveryService.markExhausted).toHaveBeenCalledWith( + 'del-1', + 'deadline exceeded', + ); + }); + + it('does NOT call markExhausted when retries remain', async () => { + const job = makeJob({ attemptsMade: 2, opts: { attempts: 5 } }); + await processor.onFailed(job, new Error('temporary glitch')); + + expect(deliveryService.markExhausted).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/apps/api/src/webhook-support/webhook.processor.ts b/apps/api/src/webhook-support/webhook.processor.ts new file mode 100644 index 0000000..490b605 --- /dev/null +++ b/apps/api/src/webhook-support/webhook.processor.ts @@ -0,0 +1,41 @@ +import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; +import { Logger } from '@nestjs/common'; +import { Job } from 'bullmq'; +import { WebhookDeliveryJobData } from './interfaces/webhook-payload.interface'; +import { WebhookDeliveryService } from './webhook-delivery.service'; +import { WEBHOOK_QUEUE } from './webhook.constants'; + +@Processor(WEBHOOK_QUEUE) +export class WebhookProcessor extends WorkerHost { + private readonly logger = new Logger(WebhookProcessor.name); + + constructor(private readonly deliveryService: WebhookDeliveryService) { + super(); + } + + async process(job: Job): Promise { + this.logger.debug(`Processing job ${job.id} (attempt ${job.attemptsMade + 1})`); + await this.deliveryService.deliver({ ...job.data, attempt: job.attemptsMade }); + } + + @OnWorkerEvent('failed') + async onFailed(job: Job, error: Error): Promise { + const isLastAttempt = job.attemptsMade >= (job.opts.attempts ?? 1); + + if (isLastAttempt) { + this.logger.error( + `Job ${job.id} permanently failed after ${job.attemptsMade} attempts: ${error.message}`, + ); + await this.deliveryService.markExhausted(job.data.deliveryId, error.message); + } else { + this.logger.warn( + `Job ${job.id} failed (attempt ${job.attemptsMade}), will retry: ${error.message}`, + ); + } + } + + @OnWorkerEvent('completed') + onCompleted(job: Job): void { + this.logger.debug(`Job ${job.id} completed successfully`); + } +} diff --git a/apps/api/src/webhook-support/webhook.service.spec.ts b/apps/api/src/webhook-support/webhook.service.spec.ts new file mode 100644 index 0000000..c4fe034 --- /dev/null +++ b/apps/api/src/webhook-support/webhook.service.spec.ts @@ -0,0 +1,302 @@ +import { getQueueToken } from '@nestjs/bullmq'; +import { NotFoundException } from '@nestjs/common'; +import { Test, TestingModule } from '@nestjs/testing'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import { WebhookDelivery } from '../entities/webhook-delivery.entity'; +import { Webhook } from '../entities/webhook.entity'; +import { DeliveryStatus } from '../enums/delivery-status.enum'; +import { WebhookEvent } from '../enums/webhook-event.enum'; +import { WEBHOOK_QUEUE } from '../webhook.constants'; +import { WebhookService } from '../webhook.service'; + +// ── helpers ────────────────────────────────────────────────────────────────── + +const makeWebhook = (overrides: Partial = {}): Webhook => + ({ + id: 'wh-uuid-1', + name: 'Test Hook', + url: 'https://example.com/hook', + secret: 'a-very-secret-key-32chars-xxxxxxx', + events: [], + isActive: true, + maxRetries: 5, + description: null, + deliveries: [], + createdAt: new Date(), + updatedAt: new Date(), + ...overrides, + } as Webhook); + +const makeDelivery = (overrides: Partial = {}): WebhookDelivery => + ({ + id: 'del-uuid-1', + webhookId: 'wh-uuid-1', + event: WebhookEvent.PING, + payload: {}, + status: DeliveryStatus.FAILED, + responseStatus: null, + responseBody: null, + errorMessage: 'timeout', + attempt: 3, + nextRetryAt: null, + deliveredAt: null, + createdAt: new Date(), + ...overrides, + } as WebhookDelivery); + +// ── mocks ───────────────────────────────────────────────────────────────────── + +const mockWebhookRepo = () => ({ + create: jest.fn(), + save: jest.fn(), + find: jest.fn(), + findOne: jest.fn(), + remove: jest.fn(), + update: jest.fn(), +}); + +const mockDeliveryRepo = () => ({ + create: jest.fn(), + save: jest.fn(), + findAndCount: jest.fn(), + findOne: jest.fn(), + update: jest.fn(), +}); + +const mockQueue = () => ({ + add: jest.fn().mockResolvedValue({ id: 'job-1' }), +}); + +// ── tests ───────────────────────────────────────────────────────────────────── + +describe('WebhookService', () => { + let service: WebhookService; + let webhookRepo: ReturnType; + let deliveryRepo: ReturnType; + let queue: ReturnType; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + WebhookService, + { provide: getRepositoryToken(Webhook), useFactory: mockWebhookRepo }, + { provide: getRepositoryToken(WebhookDelivery), useFactory: mockDeliveryRepo }, + { provide: getQueueToken(WEBHOOK_QUEUE), useFactory: mockQueue }, + ], + }).compile(); + + service = module.get(WebhookService); + webhookRepo = module.get(getRepositoryToken(Webhook)); + deliveryRepo = module.get(getRepositoryToken(WebhookDelivery)); + queue = module.get(getQueueToken(WEBHOOK_QUEUE)); + }); + + // ── create ──────────────────────────────────────────────────────────────── + + describe('create', () => { + it('creates and saves a webhook with defaults applied', async () => { + const dto = { + name: 'My Hook', + url: 'https://example.com/hook', + secret: 'my-secret-key-16chars', + }; + const created = makeWebhook(); + webhookRepo.create.mockReturnValue(created); + webhookRepo.save.mockResolvedValue(created); + + const result = await service.create(dto); + + expect(webhookRepo.create).toHaveBeenCalledWith( + expect.objectContaining({ + events: [], + isActive: true, + maxRetries: 5, + }), + ); + expect(result).toEqual(created); + }); + + it('respects caller-supplied events and maxRetries', async () => { + const dto = { + name: 'Selective Hook', + url: 'https://example.com/hook', + secret: 'my-secret-key-16chars', + events: [WebhookEvent.GAS_SPIKE_DETECTED], + maxRetries: 3, + }; + webhookRepo.create.mockReturnValue(makeWebhook()); + webhookRepo.save.mockResolvedValue(makeWebhook()); + + await service.create(dto); + + expect(webhookRepo.create).toHaveBeenCalledWith( + expect.objectContaining({ + events: [WebhookEvent.GAS_SPIKE_DETECTED], + maxRetries: 3, + }), + ); + }); + }); + + // ── findOne ─────────────────────────────────────────────────────────────── + + describe('findOne', () => { + it('returns the webhook when found', async () => { + const wh = makeWebhook(); + webhookRepo.findOne.mockResolvedValue(wh); + expect(await service.findOne('wh-uuid-1')).toEqual(wh); + }); + + it('throws NotFoundException when not found', async () => { + webhookRepo.findOne.mockResolvedValue(null); + await expect(service.findOne('missing-id')).rejects.toThrow(NotFoundException); + }); + }); + + // ── update ──────────────────────────────────────────────────────────────── + + describe('update', () => { + it('merges dto fields onto the entity and saves', async () => { + const wh = makeWebhook(); + webhookRepo.findOne.mockResolvedValue(wh); + const updated = { ...wh, name: 'Renamed' }; + webhookRepo.save.mockResolvedValue(updated); + + const result = await service.update('wh-uuid-1', { name: 'Renamed' }); + expect(result.name).toBe('Renamed'); + expect(webhookRepo.save).toHaveBeenCalledWith(expect.objectContaining({ name: 'Renamed' })); + }); + }); + + // ── remove ──────────────────────────────────────────────────────────────── + + describe('remove', () => { + it('removes the webhook', async () => { + const wh = makeWebhook(); + webhookRepo.findOne.mockResolvedValue(wh); + webhookRepo.remove.mockResolvedValue(undefined); + + await service.remove('wh-uuid-1'); + expect(webhookRepo.remove).toHaveBeenCalledWith(wh); + }); + }); + + // ── dispatch ────────────────────────────────────────────────────────────── + + describe('dispatch', () => { + it('skips queue when no active subscribers exist', async () => { + webhookRepo.find.mockResolvedValue([]); + + await service.dispatch({ event: WebhookEvent.PING }); + + expect(queue.add).not.toHaveBeenCalled(); + }); + + it('enqueues a delivery for each subscriber', async () => { + const hooks = [makeWebhook({ id: 'wh-1' }), makeWebhook({ id: 'wh-2' })]; + webhookRepo.find.mockResolvedValue(hooks); + + const delivery = makeDelivery(); + deliveryRepo.create.mockReturnValue(delivery); + deliveryRepo.save.mockResolvedValue(delivery); + + await service.dispatch({ event: WebhookEvent.PING, data: { test: true } }); + + expect(queue.add).toHaveBeenCalledTimes(2); + expect(queue.add).toHaveBeenCalledWith( + 'deliver', + expect.objectContaining({ url: hooks[0].url }), + expect.any(Object), + ); + }); + + it('only sends to webhooks subscribed to the event', async () => { + const subscribed = makeWebhook({ events: [WebhookEvent.GAS_SPIKE_DETECTED] }); + const unsubscribed = makeWebhook({ id: 'wh-2', events: [WebhookEvent.USER_CREATED] }); + webhookRepo.find.mockResolvedValue([subscribed, unsubscribed]); + + const delivery = makeDelivery(); + deliveryRepo.create.mockReturnValue(delivery); + deliveryRepo.save.mockResolvedValue(delivery); + + await service.dispatch({ event: WebhookEvent.GAS_SPIKE_DETECTED }); + + expect(queue.add).toHaveBeenCalledTimes(1); + }); + + it('sends to all-events webhooks (empty events array = subscribe all)', async () => { + const catchAll = makeWebhook({ events: [] }); // empty = all + webhookRepo.find.mockResolvedValue([catchAll]); + deliveryRepo.create.mockReturnValue(makeDelivery()); + deliveryRepo.save.mockResolvedValue(makeDelivery()); + + await service.dispatch({ event: WebhookEvent.ALERT_TRIGGERED }); + + expect(queue.add).toHaveBeenCalledTimes(1); + }); + }); + + // ── ping ────────────────────────────────────────────────────────────────── + + describe('ping', () => { + it('dispatches a PING event to the specific webhook', async () => { + const wh = makeWebhook(); + webhookRepo.findOne.mockResolvedValue(wh); + deliveryRepo.create.mockReturnValue(makeDelivery()); + deliveryRepo.save.mockResolvedValue(makeDelivery()); + + await service.ping('wh-uuid-1'); + + expect(queue.add).toHaveBeenCalledWith( + 'deliver', + expect.objectContaining({ webhookId: 'wh-uuid-1' }), + expect.any(Object), + ); + }); + }); + + // ── retryDelivery ───────────────────────────────────────────────────────── + + describe('retryDelivery', () => { + it('resets attempt count and re-queues the delivery', async () => { + const wh = makeWebhook(); + const delivery = makeDelivery({ status: DeliveryStatus.EXHAUSTED }); + webhookRepo.findOne.mockResolvedValue(wh); + deliveryRepo.findOne.mockResolvedValue(delivery); + deliveryRepo.update.mockResolvedValue(undefined); + deliveryRepo.create.mockReturnValue(delivery); + deliveryRepo.save.mockResolvedValue(delivery); + + await service.retryDelivery('wh-uuid-1', 'del-uuid-1'); + + expect(deliveryRepo.update).toHaveBeenCalledWith( + delivery.id, + expect.objectContaining({ status: DeliveryStatus.RETRYING, attempt: 0 }), + ); + expect(queue.add).toHaveBeenCalled(); + }); + + it('throws NotFoundException for unknown delivery', async () => { + webhookRepo.findOne.mockResolvedValue(makeWebhook()); + deliveryRepo.findOne.mockResolvedValue(null); + + await expect(service.retryDelivery('wh-uuid-1', 'no-delivery')).rejects.toThrow( + NotFoundException, + ); + }); + }); + + // ── listDeliveries ──────────────────────────────────────────────────────── + + describe('listDeliveries', () => { + it('returns paginated delivery records', async () => { + webhookRepo.findOne.mockResolvedValue(makeWebhook()); + const deliveries = [makeDelivery()]; + deliveryRepo.findAndCount.mockResolvedValue([deliveries, 1]); + + const result = await service.listDeliveries('wh-uuid-1', { page: 1, limit: 20 }); + + expect(result).toEqual({ items: deliveries, total: 1 }); + }); + }); +}); diff --git a/apps/api/src/webhook-support/webhook.service.ts b/apps/api/src/webhook-support/webhook.service.ts new file mode 100644 index 0000000..503084c --- /dev/null +++ b/apps/api/src/webhook-support/webhook.service.ts @@ -0,0 +1,214 @@ +import { InjectQueue } from '@nestjs/bullmq'; +import { + Injectable, + Logger, + NotFoundException, +} from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Queue } from 'bullmq'; +import { randomUUID } from 'crypto'; +import { Repository } from 'typeorm'; +import { + CreateWebhookDto, + DispatchEventDto, + ListDeliveriesQueryDto, + UpdateWebhookDto, +} from './dto/webhook.dto'; +import { WebhookDelivery } from './entities/webhook-delivery.entity'; +import { Webhook } from './entities/webhook.entity'; +import { DeliveryStatus } from './enums/delivery-status.enum'; +import { WebhookEvent } from './enums/webhook-event.enum'; +import { + WebhookDeliveryJobData, + WebhookPayload, +} from './interfaces/webhook-payload.interface'; +import { WEBHOOK_QUEUE } from './webhook.constants'; + +@Injectable() +export class WebhookService { + private readonly logger = new Logger(WebhookService.name); + + constructor( + @InjectRepository(Webhook) + private readonly webhookRepo: Repository, + @InjectRepository(WebhookDelivery) + private readonly deliveryRepo: Repository, + @InjectQueue(WEBHOOK_QUEUE) + private readonly webhookQueue: Queue, + ) {} + + // ────────────────────────────────────────────────────────────────────────── + // CRUD + // ────────────────────────────────────────────────────────────────────────── + + async create(dto: CreateWebhookDto): Promise { + const webhook = this.webhookRepo.create({ + ...dto, + events: dto.events ?? [], + isActive: dto.isActive ?? true, + maxRetries: dto.maxRetries ?? 5, + }); + return this.webhookRepo.save(webhook); + } + + async findAll(): Promise { + return this.webhookRepo.find({ order: { createdAt: 'DESC' } }); + } + + async findOne(id: string): Promise { + const webhook = await this.webhookRepo.findOne({ where: { id } }); + if (!webhook) throw new NotFoundException(`Webhook ${id} not found`); + return webhook; + } + + async update(id: string, dto: UpdateWebhookDto): Promise { + const webhook = await this.findOne(id); + Object.assign(webhook, dto); + return this.webhookRepo.save(webhook); + } + + async remove(id: string): Promise { + const webhook = await this.findOne(id); + await this.webhookRepo.remove(webhook); + } + + // ────────────────────────────────────────────────────────────────────────── + // Deliveries + // ────────────────────────────────────────────────────────────────────────── + + async listDeliveries( + webhookId: string, + query: ListDeliveriesQueryDto, + ): Promise<{ items: WebhookDelivery[]; total: number }> { + await this.findOne(webhookId); // 404 guard + + const [items, total] = await this.deliveryRepo.findAndCount({ + where: { webhookId }, + order: { createdAt: 'DESC' }, + skip: ((query.page ?? 1) - 1) * (query.limit ?? 20), + take: query.limit ?? 20, + }); + + return { items, total }; + } + + async getDelivery(webhookId: string, deliveryId: string): Promise { + await this.findOne(webhookId); + const delivery = await this.deliveryRepo.findOne({ + where: { id: deliveryId, webhookId }, + }); + if (!delivery) throw new NotFoundException(`Delivery ${deliveryId} not found`); + return delivery; + } + + /** Manually re-queue a failed delivery */ + async retryDelivery(webhookId: string, deliveryId: string): Promise { + const webhook = await this.findOne(webhookId); + const delivery = await this.getDelivery(webhookId, deliveryId); + + await this.deliveryRepo.update(delivery.id, { + status: DeliveryStatus.RETRYING, + attempt: 0, + }); + + await this.enqueueDelivery(webhook, delivery, delivery.payload as WebhookPayload, 0); + } + + // ────────────────────────────────────────────────────────────────────────── + // Event dispatch + // ────────────────────────────────────────────────────────────────────────── + + /** + * Dispatches an event to all active, subscribed webhooks. + * Call this from any service that emits domain events. + */ + async dispatch(dto: DispatchEventDto): Promise { + const subscribers = await this.findSubscribers(dto.event); + + if (!subscribers.length) { + this.logger.debug(`No subscribers for event "${dto.event}"`); + return; + } + + const payload: WebhookPayload = { + id: randomUUID(), + event: dto.event, + createdAt: new Date().toISOString(), + data: dto.data ?? {}, + }; + + await Promise.all( + subscribers.map((webhook) => this.scheduleDelivery(webhook, payload)), + ); + } + + /** + * Sends a PING event to a single webhook (used to test connectivity). + */ + async ping(id: string): Promise { + const webhook = await this.findOne(id); + await this.scheduleDelivery(webhook, { + id: randomUUID(), + event: WebhookEvent.PING, + createdAt: new Date().toISOString(), + data: { message: 'Webhook ping test' }, + }); + } + + // ────────────────────────────────────────────────────────────────────────── + // Internal helpers + // ────────────────────────────────────────────────────────────────────────── + + private async findSubscribers(event: WebhookEvent): Promise { + const all = await this.webhookRepo.find({ where: { isActive: true } }); + return all.filter( + (w) => !w.events.length || w.events.includes(event), + ); + } + + private async scheduleDelivery( + webhook: Webhook, + payload: WebhookPayload, + ): Promise { + const delivery = this.deliveryRepo.create({ + webhookId: webhook.id, + event: payload.event, + payload: payload as unknown as Record, + status: DeliveryStatus.PENDING, + attempt: 0, + }); + + const saved = await this.deliveryRepo.save(delivery); + await this.enqueueDelivery(webhook, saved, payload, 0); + } + + private async enqueueDelivery( + webhook: Webhook, + delivery: WebhookDelivery, + payload: WebhookPayload, + attempt: number, + ): Promise { + const jobData: WebhookDeliveryJobData = { + webhookId: webhook.id, + deliveryId: delivery.id, + url: webhook.url, + secret: webhook.secret, + payload, + attempt, + }; + + await this.webhookQueue.add('deliver', jobData, { + attempts: webhook.maxRetries, + backoff: { + type: 'exponential', + delay: 5_000, // 5 s → 10 s → 20 s … + }, + removeOnComplete: { age: 86_400 }, // keep 24 h + removeOnFail: false, + }); + + this.logger.log( + `Queued delivery ${delivery.id} for webhook ${webhook.id} (event: ${payload.event})`, + ); + } +}