diff --git a/app/backend/prisma/migrations/20260328000000_add_webhook_subscriptions/migration.sql b/app/backend/prisma/migrations/20260328000000_add_webhook_subscriptions/migration.sql new file mode 100644 index 0000000..43bf19b --- /dev/null +++ b/app/backend/prisma/migrations/20260328000000_add_webhook_subscriptions/migration.sql @@ -0,0 +1,40 @@ +CREATE TABLE "WebhookSubscription" ( + "id" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "apiKeyId" TEXT NOT NULL, + "url" TEXT NOT NULL, + "secret" TEXT NOT NULL, + "events" TEXT[] NOT NULL, + "isActive" BOOLEAN NOT NULL DEFAULT true, + + CONSTRAINT "WebhookSubscription_pkey" PRIMARY KEY ("id") +); + +CREATE TABLE "WebhookDeliveryAttempt" ( + "id" TEXT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "subscriptionId" TEXT NOT NULL, + "event" TEXT NOT NULL, + "attempt" INTEGER NOT NULL, + "status" TEXT NOT NULL, + "payload" JSONB NOT NULL, + "responseStatus" INTEGER, + "responseBody" TEXT, + "errorMessage" TEXT, + "deliveredAt" TIMESTAMP(3), + + CONSTRAINT "WebhookDeliveryAttempt_pkey" PRIMARY KEY ("id") +); + +CREATE INDEX "WebhookSubscription_apiKeyId_idx" ON "WebhookSubscription"("apiKeyId"); +CREATE INDEX "WebhookSubscription_isActive_idx" ON "WebhookSubscription"("isActive"); +CREATE INDEX "WebhookDeliveryAttempt_subscriptionId_createdAt_idx" ON "WebhookDeliveryAttempt"("subscriptionId", "createdAt"); +CREATE INDEX "WebhookDeliveryAttempt_event_status_idx" ON "WebhookDeliveryAttempt"("event", "status"); + +ALTER TABLE "WebhookSubscription" ADD CONSTRAINT "WebhookSubscription_apiKeyId_fkey" +FOREIGN KEY ("apiKeyId") REFERENCES "ApiKey"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +ALTER TABLE "WebhookDeliveryAttempt" ADD CONSTRAINT "WebhookDeliveryAttempt_subscriptionId_fkey" +FOREIGN KEY ("subscriptionId") REFERENCES "WebhookSubscription"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/app/backend/prisma/schema.prisma b/app/backend/prisma/schema.prisma index 5b78a88..2bd2b97 100644 --- a/app/backend/prisma/schema.prisma +++ b/app/backend/prisma/schema.prisma @@ -88,6 +88,46 @@ model Claim { @@index([deletedAt]) } +model WebhookSubscription { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + apiKeyId String + apiKey ApiKey @relation(fields: [apiKeyId], references: [id], onDelete: Cascade) + + url String + secret String + events String[] + isActive Boolean @default(true) + + deliveries WebhookDeliveryAttempt[] + + @@index([apiKeyId]) + @@index([isActive]) +} + +model WebhookDeliveryAttempt { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + subscriptionId String + subscription WebhookSubscription @relation(fields: [subscriptionId], references: [id], onDelete: Cascade) + + event String + attempt Int + status String + payload Json + responseStatus Int? + responseBody String? + errorMessage String? + deliveredAt DateTime? + + @@index([subscriptionId, createdAt]) + @@index([event, status]) +} + model AuditLog { id String @id @default(cuid()) actorId String @@ -149,5 +189,5 @@ model ApiKey { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - @@index([ngoId]) + webhookSubscriptions WebhookSubscription[] } diff --git a/app/backend/src/app.module.ts b/app/backend/src/app.module.ts index 2a4edde..52374c2 100644 --- a/app/backend/src/app.module.ts +++ b/app/backend/src/app.module.ts @@ -31,8 +31,7 @@ import { LoggingInterceptor } from './interceptors/logging.interceptor'; import { LoggerService } from './logger/logger.service'; import { AllExceptionsFilter } from './common/filters/http-exception.filter'; import { AnalyticsModule } from './analytics/analytics.module'; -import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler'; -import { AidEscrowModule } from './onchain/aid-escrow.module'; +import { WebhooksModule } from './webhooks/webhooks.module'; @Module({ imports: [ @@ -75,13 +74,7 @@ import { AidEscrowModule } from './onchain/aid-escrow.module'; NotificationsModule, JobsModule, AnalyticsModule, - AidEscrowModule, - ThrottlerModule.forRoot([ - { - ttl: 60000, // 60 seconds window - limit: 20, // default: 20 req/min - }, - ]), + WebhooksModule, ], diff --git a/app/backend/src/campaigns/campaigns.controller.ts b/app/backend/src/campaigns/campaigns.controller.ts index e65e4d5..582582a 100644 --- a/app/backend/src/campaigns/campaigns.controller.ts +++ b/app/backend/src/campaigns/campaigns.controller.ts @@ -134,6 +134,7 @@ export class CampaignsController { @ApiForbiddenResponse({ description: 'Access denied - insufficient permissions.', }) + @Patch(':id/archive') async archive(@Param('id') id: string) { const campaignData = await this.campaigns.archive(id); const { campaign, alreadyArchived } = campaignData; diff --git a/app/backend/src/campaigns/campaigns.module.ts b/app/backend/src/campaigns/campaigns.module.ts index 6fbd777..bee7323 100644 --- a/app/backend/src/campaigns/campaigns.module.ts +++ b/app/backend/src/campaigns/campaigns.module.ts @@ -1,8 +1,11 @@ import { Module } from '@nestjs/common'; import { CampaignsController } from './campaigns.controller'; import { CampaignsService } from './campaigns.service'; +import { PrismaModule } from '../prisma/prisma.module'; +import { WebhooksModule } from '../webhooks/webhooks.module'; @Module({ + imports: [PrismaModule, WebhooksModule], controllers: [CampaignsController], providers: [CampaignsService], }) diff --git a/app/backend/src/campaigns/campaigns.service.spec.ts b/app/backend/src/campaigns/campaigns.service.spec.ts index 6a27c28..fe498b2 100644 --- a/app/backend/src/campaigns/campaigns.service.spec.ts +++ b/app/backend/src/campaigns/campaigns.service.spec.ts @@ -4,10 +4,14 @@ import { Campaign, CampaignStatus, Prisma } from '@prisma/client'; import { CampaignsService } from './campaigns.service'; import { PrismaService } from 'src/prisma/prisma.service'; import { DeepMockProxy, mockDeep } from 'jest-mock-extended'; +import { WebhooksService } from '../webhooks/webhooks.service'; describe('CampaignsService', () => { let service: CampaignsService; let prismaMock: DeepMockProxy; + const webhooksService = { + enqueueEvent: jest.fn().mockResolvedValue(1), + }; const now = new Date('2026-01-25T00:00:00.000Z'); @@ -32,6 +36,7 @@ describe('CampaignsService', () => { providers: [ CampaignsService, { provide: PrismaService, useValue: prismaMock }, + { provide: WebhooksService, useValue: webhooksService }, ], }).compile(); @@ -156,4 +161,39 @@ describe('CampaignsService', () => { expect(updateArgs?.data).toMatchObject({ deletedAt: expect.any(Date) }); expect(result.deletedAt).not.toBeNull(); }); + + it('update(): emits campaign.completed when status transitions to completed', async () => { + const existing: Campaign = { + id: 'c1', + name: 'A', + status: CampaignStatus.active, + budget: new Prisma.Decimal('10.00'), + metadata: null, + archivedAt: null, + createdAt: now, + updatedAt: now, + }; + const updated: Campaign = { + ...existing, + status: CampaignStatus.completed, + updatedAt: new Date('2026-01-26T00:00:00.000Z'), + }; + + prismaMock.campaign.findUnique.mockResolvedValue(existing); + prismaMock.campaign.update.mockResolvedValue(updated); + + await service.update('c1', { status: CampaignStatus.completed }); + + expect(webhooksService.enqueueEvent).toHaveBeenCalledWith( + 'campaign.completed', + expect.objectContaining({ + event: 'campaign.completed', + campaign: expect.objectContaining({ + id: 'c1', + status: CampaignStatus.completed, + }), + previousStatus: CampaignStatus.active, + }), + ); + }); }); diff --git a/app/backend/src/campaigns/campaigns.service.ts b/app/backend/src/campaigns/campaigns.service.ts index d0d5936..c3baca4 100644 --- a/app/backend/src/campaigns/campaigns.service.ts +++ b/app/backend/src/campaigns/campaigns.service.ts @@ -3,10 +3,14 @@ import { CampaignStatus, Prisma } from '@prisma/client'; import { PrismaService } from '../prisma/prisma.service'; import { CreateCampaignDto } from './dto/create-campaign.dto'; import { UpdateCampaignDto } from './dto/update-campaign.dto'; +import { WebhooksService } from '../webhooks/webhooks.service'; @Injectable() export class CampaignsService { - constructor(private readonly prisma: PrismaService) {} + constructor( + private readonly prisma: PrismaService, + private readonly webhooksService: WebhooksService, + ) {} private sanitizeMetadata( metadata?: Record, @@ -51,9 +55,9 @@ export class CampaignsService { } async update(id: string, dto: UpdateCampaignDto) { - await this.findOne(id); + const existing = await this.findOne(id); - return this.prisma.campaign.update({ + const updated = await this.prisma.campaign.update({ where: { id }, data: { name: dto.name, @@ -65,6 +69,26 @@ export class CampaignsService { : this.sanitizeMetadata(dto.metadata), }, }); + + if ( + updated.status === CampaignStatus.completed && + existing.status !== CampaignStatus.completed + ) { + await this.webhooksService.enqueueEvent('campaign.completed', { + event: 'campaign.completed', + occurredAt: updated.updatedAt.toISOString(), + campaign: { + id: updated.id, + name: updated.name, + status: updated.status, + budget: updated.budget.toString(), + archivedAt: updated.archivedAt?.toISOString() ?? null, + }, + previousStatus: existing.status, + }); + } + + return updated; } async archive(id: string) { diff --git a/app/backend/src/claims/claims.module.ts b/app/backend/src/claims/claims.module.ts index f2de0da..73cd1ae 100644 --- a/app/backend/src/claims/claims.module.ts +++ b/app/backend/src/claims/claims.module.ts @@ -6,6 +6,7 @@ import { OnchainModule } from '../onchain/onchain.module'; import { MetricsModule } from '../observability/metrics/metrics.module'; import { LoggerModule } from '../logger/logger.module'; import { AuditModule } from '../audit/audit.module'; +import { WebhooksModule } from '../webhooks/webhooks.module'; import { EncryptionModule } from '../common/encryption/encryption.module'; @Module({ @@ -15,6 +16,7 @@ import { EncryptionModule } from '../common/encryption/encryption.module'; MetricsModule, LoggerModule, AuditModule, + WebhooksModule, EncryptionModule, ], controllers: [ClaimsController], diff --git a/app/backend/src/claims/claims.service.spec.ts b/app/backend/src/claims/claims.service.spec.ts index 7e5d5bb..ed21973 100644 --- a/app/backend/src/claims/claims.service.spec.ts +++ b/app/backend/src/claims/claims.service.spec.ts @@ -13,6 +13,7 @@ import { MetricsService } from '../observability/metrics/metrics.service'; import { AuditService } from '../audit/audit.service'; import { EncryptionService } from '../common/encryption/encryption.service'; import { ClaimStatus, Prisma } from '@prisma/client'; +import { WebhooksService } from '../webhooks/webhooks.service'; describe('ClaimsService', () => { let service: ClaimsService; @@ -63,6 +64,10 @@ describe('ClaimsService', () => { record: jest.fn().mockResolvedValue({ id: 'audit-1' }), }; + const mockWebhooksService = { + enqueueEvent: jest.fn().mockResolvedValue(1), + }; + beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ providers: [ @@ -121,6 +126,10 @@ describe('ClaimsService', () => { decryptDeterministic: jest.fn((v: string) => v), }, }, + { + provide: WebhooksService, + useValue: mockWebhooksService, + }, ], }).compile(); @@ -134,6 +143,39 @@ describe('ClaimsService', () => { jest.clearAllMocks(); }); + it('verify should enqueue claim.verified webhooks after a successful transition', async () => { + jest.spyOn(prismaService.claim, 'findUnique').mockResolvedValueOnce({ + ...mockClaim, + status: ClaimStatus.requested, + } as typeof mockClaim); + type TxClient = { claim: { update: jest.Mock } }; + jest + .spyOn(prismaService, '$transaction') + .mockImplementation( + async (callback: (tx: TxClient) => Promise) => + callback({ + claim: { + update: jest.fn().mockResolvedValue({ + ...mockClaim, + status: ClaimStatus.verified, + }), + }, + }), + ); + + await service.verify('claim-123'); + + expect(mockWebhooksService.enqueueEvent).toHaveBeenCalledWith( + 'claim.verified', + expect.objectContaining({ + event: 'claim.verified', + claim: expect.objectContaining({ + id: 'claim-123', + }), + }), + ); + }); + describe('disburse', () => { it('should call on-chain adapter when enabled', async () => { jest @@ -308,6 +350,10 @@ describe('ClaimsService', () => { decryptDeterministic: jest.fn((v: string) => v), }, }, + { + provide: WebhooksService, + useValue: mockWebhooksService, + }, ], }).compile(); @@ -357,6 +403,38 @@ describe('ClaimsService', () => { ); }); + it('should enqueue claim.disbursed webhooks after a successful disbursement', async () => { + jest + .spyOn(prismaService.claim, 'findUnique') + .mockResolvedValue(mockClaim); + type TxClient = { claim: { update: jest.Mock } }; + jest + .spyOn(prismaService, '$transaction') + .mockImplementation( + async (callback: (tx: TxClient) => Promise) => + callback({ + claim: { + update: jest.fn().mockResolvedValue({ + ...mockClaim, + status: ClaimStatus.disbursed, + }), + }, + }), + ); + + await service.disburse('claim-123'); + + expect(mockWebhooksService.enqueueEvent).toHaveBeenCalledWith( + 'claim.disbursed', + expect.objectContaining({ + event: 'claim.disbursed', + claim: expect.objectContaining({ + id: 'claim-123', + }), + }), + ); + }); + it('should throw NotFoundException if claim does not exist', async () => { jest.spyOn(prismaService.claim, 'findUnique').mockResolvedValue(null); diff --git a/app/backend/src/claims/claims.service.ts b/app/backend/src/claims/claims.service.ts index c39e798..d9c1092 100644 --- a/app/backend/src/claims/claims.service.ts +++ b/app/backend/src/claims/claims.service.ts @@ -10,7 +10,7 @@ import { ConfigService } from '@nestjs/config'; import { createHash } from 'crypto'; import { PrismaService } from '../prisma/prisma.service'; import { CreateClaimDto } from './dto/create-claim.dto'; -import { ClaimStatus } from '@prisma/client'; +import { ClaimStatus, Prisma } from '@prisma/client'; import { OnchainAdapter, DisburseResult, @@ -19,6 +19,7 @@ import { import { LoggerService } from '../logger/logger.service'; import { MetricsService } from '../observability/metrics/metrics.service'; import { AuditService } from '../audit/audit.service'; +import { WebhooksService } from '../webhooks/webhooks.service'; import { EncryptionService } from '../common/encryption/encryption.service'; @Injectable() @@ -36,6 +37,7 @@ export class ClaimsService { private readonly metricsService: MetricsService, private readonly auditService: AuditService, private readonly encryptionService: EncryptionService, + private readonly webhooksService: WebhooksService, ) { this.onchainEnabled = this.configService.get('ONCHAIN_ENABLED') === 'true'; @@ -337,7 +339,60 @@ export class ClaimsService { return updated; }); - return updatedClaim; + const hydratedClaim = { + ...updatedClaim, + recipientRef: this.encryptionService.decrypt(updatedClaim.recipientRef), + }; + + await this.emitWebhookForClaimStatus(hydratedClaim, fromStatus, toStatus); + + return hydratedClaim; + } + + private async emitWebhookForClaimStatus( + claim: { + id: string; + status: ClaimStatus; + campaignId: string; + amount: Prisma.Decimal; + recipientRef: string; + evidenceRef?: string | null; + updatedAt: Date; + }, + fromStatus: ClaimStatus, + toStatus: ClaimStatus, + ) { + if (toStatus === ClaimStatus.verified) { + await this.webhooksService.enqueueEvent('claim.verified', { + event: 'claim.verified', + occurredAt: claim.updatedAt.toISOString(), + claim: { + id: claim.id, + campaignId: claim.campaignId, + status: claim.status, + amount: claim.amount.toString(), + recipientRef: claim.recipientRef, + evidenceRef: claim.evidenceRef ?? null, + }, + previousStatus: fromStatus, + }); + } + + if (toStatus === ClaimStatus.disbursed) { + await this.webhooksService.enqueueEvent('claim.disbursed', { + event: 'claim.disbursed', + occurredAt: claim.updatedAt.toISOString(), + claim: { + id: claim.id, + campaignId: claim.campaignId, + status: claim.status, + amount: claim.amount.toString(), + recipientRef: claim.recipientRef, + evidenceRef: claim.evidenceRef ?? null, + }, + previousStatus: fromStatus, + }); + } } private auditLog( diff --git a/app/backend/src/common/guards/api-key.guard.ts b/app/backend/src/common/guards/api-key.guard.ts index b5e86f1..6333c15 100644 --- a/app/backend/src/common/guards/api-key.guard.ts +++ b/app/backend/src/common/guards/api-key.guard.ts @@ -46,7 +46,7 @@ export class ApiKeyGuard implements CanActivate { }); if (record) { - request.user = { role: record.role, ngoId: record.ngoId }; + request.user = { role: record.role, apiKeyId: record.id }; return true; } diff --git a/app/backend/src/types/express.d.ts b/app/backend/src/types/express.d.ts index e233c4e..0a85f8b 100644 --- a/app/backend/src/types/express.d.ts +++ b/app/backend/src/types/express.d.ts @@ -3,7 +3,10 @@ import { AppRole } from '../auth/app-role.enum'; declare global { namespace Express { interface Request { - user?: { role: AppRole; ngoId?: string | null }; + user?: { + role: AppRole; + apiKeyId?: string; + }; } } } diff --git a/app/backend/src/verification/verification.module.ts b/app/backend/src/verification/verification.module.ts index 101ef99..c01e0bd 100644 --- a/app/backend/src/verification/verification.module.ts +++ b/app/backend/src/verification/verification.module.ts @@ -9,6 +9,7 @@ import { VerificationProcessor } from './verification.processor'; import { PrismaModule } from '../prisma/prisma.module'; import { AuditModule } from '../audit/audit.module'; import { NotificationsModule } from '../notifications/notifications.module'; +import { WebhooksModule } from '../webhooks/webhooks.module'; import { EncryptionModule } from '../common/encryption/encryption.module'; @Module({ @@ -18,6 +19,7 @@ import { EncryptionModule } from '../common/encryption/encryption.module'; PrismaModule, AuditModule, NotificationsModule, + WebhooksModule, EncryptionModule, BullModule.registerQueueAsync({ name: 'verification', diff --git a/app/backend/src/verification/verification.service.spec.ts b/app/backend/src/verification/verification.service.spec.ts index f9f67ac..1e182ed 100644 --- a/app/backend/src/verification/verification.service.spec.ts +++ b/app/backend/src/verification/verification.service.spec.ts @@ -8,6 +8,7 @@ import { PrismaService } from '../prisma/prisma.service'; import { AuditService } from '../audit/audit.service'; import { ClaimStatus, Prisma } from '@prisma/client'; import { of } from 'rxjs'; +import { WebhooksService } from '../webhooks/webhooks.service'; describe('VerificationService', () => { let service: VerificationService; @@ -19,20 +20,19 @@ describe('VerificationService', () => { getCompletedCount: jest.Mock; getFailedCount: jest.Mock; }; + const mockWebhooksService = { + enqueueEvent: jest.fn().mockResolvedValue(1), + }; const mockClaim = { id: 'test-claim-id', - status: ClaimStatus.requested, - description: 'Test claim', + status: 'requested', + campaignId: 'campaign-1', + amount: { toString: () => '125.50' }, + recipientRef: 'recipient-1', + evidenceRef: 'https://example.com/evidence.png', createdAt: new Date(), updatedAt: new Date(), - campaignId: 'test-campaign-id', - amount: new Prisma.Decimal(100.0), - recipientRef: 'test-recipient', - evidenceRef: 'test-evidence', - verificationResult: null, - verifiedAt: null, - metadata: null, }; beforeEach(async () => { @@ -87,6 +87,10 @@ describe('VerificationService', () => { post: jest.fn().mockReturnValue(of({ data: {} })), }, }, + { + provide: WebhooksService, + useValue: mockWebhooksService, + }, ], }).compile(); @@ -210,6 +214,15 @@ describe('VerificationService', () => { const updateCall = updateSpy.mock.calls[0]?.[0]; expect(updateCall?.data).toHaveProperty('status'); expect(updateCall?.data?.status).toBe('verified'); + expect(mockWebhooksService.enqueueEvent).toHaveBeenCalledWith( + 'claim.verified', + expect.objectContaining({ + event: 'claim.verified', + claim: expect.objectContaining({ + id: 'test-claim-id', + }), + }), + ); }); }); diff --git a/app/backend/src/verification/verification.service.ts b/app/backend/src/verification/verification.service.ts index d8beeb4..1102b02 100644 --- a/app/backend/src/verification/verification.service.ts +++ b/app/backend/src/verification/verification.service.ts @@ -11,11 +11,7 @@ import { } from './interfaces/verification-job.interface'; import { AuditService } from '../audit/audit.service'; import { firstValueFrom } from 'rxjs'; -import OpenAI from 'openai'; - -// --------------------------------------------------------------------------- -// OCR service types -// --------------------------------------------------------------------------- +import { WebhooksService } from '../webhooks/webhooks.service'; interface OCRFieldResult { value: string; @@ -79,6 +75,7 @@ export class VerificationService { private readonly prisma: PrismaService, private readonly auditService: AuditService, private readonly httpService: HttpService, + private readonly webhooksService: WebhooksService, ) { this.verificationMode = this.configService.get('VERIFICATION_MODE') || 'mock'; @@ -185,13 +182,35 @@ export class VerificationService { const shouldVerify = result.score >= this.verificationThreshold; - await this.prisma.claim.update({ + const previousStatus = claim.status; + const updatedClaim = await this.prisma.claim.update({ where: { id: claimId }, data: { status: shouldVerify ? 'verified' : 'requested', }, }); + if (shouldVerify && previousStatus !== 'verified') { + await this.webhooksService.enqueueEvent('claim.verified', { + event: 'claim.verified', + occurredAt: new Date().toISOString(), + claim: { + id: updatedClaim.id, + campaignId: updatedClaim.campaignId, + status: updatedClaim.status, + amount: + typeof updatedClaim.amount === 'object' && + updatedClaim.amount !== null && + 'toString' in updatedClaim.amount + ? updatedClaim.amount.toString() + : String(updatedClaim.amount), + recipientRef: updatedClaim.recipientRef, + evidenceRef: updatedClaim.evidenceRef ?? null, + }, + previousStatus, + }); + } + this.logger.log( `Claim ${claimId} verification completed – score ${result.score} ` + `(threshold: ${this.verificationThreshold})`, diff --git a/app/backend/src/webhooks/dto/create-webhook-subscription.dto.ts b/app/backend/src/webhooks/dto/create-webhook-subscription.dto.ts new file mode 100644 index 0000000..2d419ca --- /dev/null +++ b/app/backend/src/webhooks/dto/create-webhook-subscription.dto.ts @@ -0,0 +1,57 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { + ArrayNotEmpty, + ArrayUnique, + IsArray, + IsBoolean, + IsIn, + IsOptional, + IsString, + IsUrl, + MaxLength, + MinLength, +} from 'class-validator'; +import { WEBHOOK_EVENTS, WebhookEvent } from '../webhook-events'; + +export class CreateWebhookSubscriptionDto { + @ApiProperty({ + description: 'Destination URL that receives webhook events.', + example: 'https://ngo.example.org/hooks/soter', + }) + @IsUrl({ + require_tld: false, + protocols: ['http', 'https'], + require_protocol: true, + }) + url!: string; + + @ApiProperty({ + description: 'Shared secret used to generate HMAC signatures.', + example: 'ngo_shared_secret_123', + }) + @IsString() + @MinLength(8) + @MaxLength(255) + secret!: string; + + @ApiProperty({ + description: 'Events this subscription should receive.', + enum: WEBHOOK_EVENTS, + isArray: true, + example: ['claim.verified', 'campaign.completed'], + }) + @IsArray() + @ArrayNotEmpty() + @ArrayUnique() + @IsIn(WEBHOOK_EVENTS, { each: true }) + events!: WebhookEvent[]; + + @ApiProperty({ + description: 'Whether the subscription is active.', + required: false, + default: true, + }) + @IsOptional() + @IsBoolean() + isActive?: boolean; +} diff --git a/app/backend/src/webhooks/dto/update-webhook-subscription.dto.ts b/app/backend/src/webhooks/dto/update-webhook-subscription.dto.ts new file mode 100644 index 0000000..be72d7b --- /dev/null +++ b/app/backend/src/webhooks/dto/update-webhook-subscription.dto.ts @@ -0,0 +1,6 @@ +import { PartialType } from '@nestjs/swagger'; +import { CreateWebhookSubscriptionDto } from './create-webhook-subscription.dto'; + +export class UpdateWebhookSubscriptionDto extends PartialType( + CreateWebhookSubscriptionDto, +) {} diff --git a/app/backend/src/webhooks/dto/webhook-subscription-response.dto.ts b/app/backend/src/webhooks/dto/webhook-subscription-response.dto.ts new file mode 100644 index 0000000..376940f --- /dev/null +++ b/app/backend/src/webhooks/dto/webhook-subscription-response.dto.ts @@ -0,0 +1,22 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { WebhookEvent, WEBHOOK_EVENTS } from '../webhook-events'; + +export class WebhookSubscriptionResponseDto { + @ApiProperty({ example: 'ckxyz123' }) + id!: string; + + @ApiProperty({ example: 'https://ngo.example.org/hooks/soter' }) + url!: string; + + @ApiProperty({ enum: WEBHOOK_EVENTS, isArray: true }) + events!: WebhookEvent[]; + + @ApiProperty({ example: true }) + isActive!: boolean; + + @ApiProperty({ example: '2026-03-28T10:00:00.000Z' }) + createdAt!: Date; + + @ApiProperty({ example: '2026-03-28T10:00:00.000Z' }) + updatedAt!: Date; +} diff --git a/app/backend/src/webhooks/interfaces/webhook-job.interface.ts b/app/backend/src/webhooks/interfaces/webhook-job.interface.ts new file mode 100644 index 0000000..acb2d65 --- /dev/null +++ b/app/backend/src/webhooks/interfaces/webhook-job.interface.ts @@ -0,0 +1,12 @@ +import { WebhookEvent } from '../webhook-events'; + +export type WebhookJobData = { + subscriptionId: string; + event: WebhookEvent; + payload: Record; +}; + +export type WebhookDeliveryResult = { + delivered: boolean; + responseStatus: number; +}; diff --git a/app/backend/src/webhooks/webhook-events.ts b/app/backend/src/webhooks/webhook-events.ts new file mode 100644 index 0000000..6198655 --- /dev/null +++ b/app/backend/src/webhooks/webhook-events.ts @@ -0,0 +1,13 @@ +export const WEBHOOK_EVENTS = [ + 'claim.verified', + 'claim.disbursed', + 'campaign.completed', +] as const; + +export type WebhookEvent = (typeof WEBHOOK_EVENTS)[number]; + +export const WEBHOOK_QUEUE = 'webhooks'; + +export function isWebhookEvent(value: string): value is WebhookEvent { + return (WEBHOOK_EVENTS as readonly string[]).includes(value); +} diff --git a/app/backend/src/webhooks/webhooks.controller.ts b/app/backend/src/webhooks/webhooks.controller.ts new file mode 100644 index 0000000..a60dc25 --- /dev/null +++ b/app/backend/src/webhooks/webhooks.controller.ts @@ -0,0 +1,110 @@ +import { + Body, + Controller, + Delete, + ForbiddenException, + Get, + Param, + Patch, + Post, + Req, +} from '@nestjs/common'; +import { + ApiBearerAuth, + ApiCreatedResponse, + ApiOkResponse, + ApiTags, +} from '@nestjs/swagger'; +import { Request } from 'express'; +import { ApiResponseDto } from '../common/dto/api-response.dto'; +import { AppRole } from '../auth/app-role.enum'; +import { Roles } from '../auth/roles.decorator'; +import { CreateWebhookSubscriptionDto } from './dto/create-webhook-subscription.dto'; +import { UpdateWebhookSubscriptionDto } from './dto/update-webhook-subscription.dto'; +import { WebhookSubscriptionResponseDto } from './dto/webhook-subscription-response.dto'; +import { WebhooksService } from './webhooks.service'; + +@ApiTags('Webhooks') +@ApiBearerAuth('JWT-auth') +@Roles(AppRole.ngo, AppRole.admin) +@Controller('webhooks/subscriptions') +export class WebhooksController { + constructor(private readonly webhooksService: WebhooksService) {} + + @Get() + @ApiOkResponse({ + description: 'Webhook subscriptions fetched successfully.', + type: WebhookSubscriptionResponseDto, + isArray: true, + }) + async list(@Req() req: Request) { + const subscriptions = await this.webhooksService.listSubscriptions( + this.requireApiKeyId(req), + ); + return ApiResponseDto.ok( + subscriptions, + 'Webhook subscriptions fetched successfully', + ); + } + + @Post() + @ApiCreatedResponse({ + description: 'Webhook subscription created successfully.', + type: WebhookSubscriptionResponseDto, + }) + async create(@Req() req: Request, @Body() dto: CreateWebhookSubscriptionDto) { + const subscription = await this.webhooksService.createSubscription( + this.requireApiKeyId(req), + dto, + ); + + return ApiResponseDto.ok( + subscription, + 'Webhook subscription created successfully', + ); + } + + @Patch(':id') + @ApiOkResponse({ + description: 'Webhook subscription updated successfully.', + type: WebhookSubscriptionResponseDto, + }) + async update( + @Req() req: Request, + @Param('id') id: string, + @Body() dto: UpdateWebhookSubscriptionDto, + ) { + const subscription = await this.webhooksService.updateSubscription( + this.requireApiKeyId(req), + id, + dto, + ); + + return ApiResponseDto.ok( + subscription, + 'Webhook subscription updated successfully', + ); + } + + @Delete(':id') + @ApiOkResponse({ description: 'Webhook subscription deleted successfully.' }) + async remove(@Req() req: Request, @Param('id') id: string) { + await this.webhooksService.deleteSubscription( + this.requireApiKeyId(req), + id, + ); + return ApiResponseDto.ok(null, 'Webhook subscription deleted successfully'); + } + + private requireApiKeyId(req: Request): string { + const apiKeyId = req.user?.apiKeyId; + + if (!apiKeyId) { + throw new ForbiddenException( + 'A persisted API key is required to manage webhook subscriptions', + ); + } + + return apiKeyId; + } +} diff --git a/app/backend/src/webhooks/webhooks.module.ts b/app/backend/src/webhooks/webhooks.module.ts new file mode 100644 index 0000000..f7d298d --- /dev/null +++ b/app/backend/src/webhooks/webhooks.module.ts @@ -0,0 +1,34 @@ +import { Module } from '@nestjs/common'; +import { BullModule } from '@nestjs/bullmq'; +import { ConfigModule, ConfigService } from '@nestjs/config'; +import { PrismaModule } from '../prisma/prisma.module'; +import { WEBHOOK_QUEUE } from './webhook-events'; +import { WebhooksController } from './webhooks.controller'; +import { WebhooksProcessor } from './webhooks.processor'; +import { WebhooksService } from './webhooks.service'; + +@Module({ + imports: [ + ConfigModule, + PrismaModule, + BullModule.registerQueueAsync({ + name: WEBHOOK_QUEUE, + imports: [ConfigModule], + useFactory: (configService: ConfigService) => ({ + connection: { + host: configService.get('REDIS_HOST') ?? 'localhost', + port: parseInt(configService.get('REDIS_PORT') ?? '6379', 10), + }, + defaultJobOptions: { + removeOnComplete: 100, + removeOnFail: 100, + }, + }), + inject: [ConfigService], + }), + ], + controllers: [WebhooksController], + providers: [WebhooksService, WebhooksProcessor], + exports: [WebhooksService], +}) +export class WebhooksModule {} diff --git a/app/backend/src/webhooks/webhooks.processor.spec.ts b/app/backend/src/webhooks/webhooks.processor.spec.ts new file mode 100644 index 0000000..2bc19a6 --- /dev/null +++ b/app/backend/src/webhooks/webhooks.processor.spec.ts @@ -0,0 +1,110 @@ +import { Job } from 'bullmq'; +import axios from 'axios'; +import { createHmac } from 'node:crypto'; +import { WebhooksProcessor } from './webhooks.processor'; +import { PrismaService } from '../prisma/prisma.service'; + +jest.mock('axios'); + +describe('WebhooksProcessor', () => { + const mockedAxios = axios as jest.Mocked; + const findSubscription = jest.fn(); + const createDeliveryAttempt = jest.fn(); + + const prisma = { + webhookSubscription: { + findUnique: findSubscription, + }, + webhookDeliveryAttempt: { + create: createDeliveryAttempt, + }, + } as unknown as PrismaService; + + let processor: WebhooksProcessor; + + beforeEach(() => { + jest.clearAllMocks(); + processor = new WebhooksProcessor(prisma); + }); + + it('posts webhook payloads with HMAC signature headers', async () => { + findSubscription.mockResolvedValue({ + id: 'sub-1', + url: 'https://ngo.example.com/hooks', + secret: 'supersecret', + isActive: true, + }); + createDeliveryAttempt.mockResolvedValue({}); + mockedAxios.post.mockResolvedValue({ + status: 202, + data: { accepted: true }, + }); + + const job = { + id: 'job-1', + attemptsMade: 0, + data: { + subscriptionId: 'sub-1', + event: 'claim.disbursed', + payload: { + event: 'claim.disbursed', + claim: { id: 'claim-1', amount: '200.00' }, + }, + }, + } as Job; + + await processor.process(job); + + const [, body, config] = mockedAxios.post.mock.calls[0] ?? []; + const timestamp = config?.headers?.['x-soter-timestamp'] as string; + const signature = config?.headers?.['x-soter-signature'] as string; + const expected = createHmac('sha256', 'supersecret') + .update(`${timestamp}.${JSON.stringify(body)}`) + .digest('hex'); + + expect(config?.headers?.['x-soter-event']).toBe('claim.disbursed'); + expect(signature).toBe(`sha256=${expected}`); + expect(createDeliveryAttempt).toHaveBeenCalledWith({ + data: expect.objectContaining({ + subscriptionId: 'sub-1', + status: 'delivered', + responseStatus: 202, + }), + }); + }); + + it('records failed delivery attempts for retryable errors', async () => { + findSubscription.mockResolvedValue({ + id: 'sub-1', + url: 'https://ngo.example.com/hooks', + secret: 'supersecret', + isActive: true, + }); + createDeliveryAttempt.mockResolvedValue({}); + mockedAxios.post.mockRejectedValue(new Error('socket hang up')); + + const job = { + id: 'job-2', + attemptsMade: 1, + data: { + subscriptionId: 'sub-1', + event: 'claim.verified', + payload: { + event: 'claim.verified', + claim: { id: 'claim-2' }, + }, + }, + } as Job; + + await expect(processor.process(job)).rejects.toThrow('socket hang up'); + + expect(createDeliveryAttempt).toHaveBeenCalledWith({ + data: expect.objectContaining({ + subscriptionId: 'sub-1', + attempt: 2, + status: 'failed', + errorMessage: 'socket hang up', + }), + }); + }); +}); diff --git a/app/backend/src/webhooks/webhooks.processor.ts b/app/backend/src/webhooks/webhooks.processor.ts new file mode 100644 index 0000000..b5ce646 --- /dev/null +++ b/app/backend/src/webhooks/webhooks.processor.ts @@ -0,0 +1,164 @@ +import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; +import { Injectable, Logger } from '@nestjs/common'; +import { Job } from 'bullmq'; +import axios from 'axios'; +import { createHmac } from 'node:crypto'; +import { Prisma } from '@prisma/client'; +import { PrismaService } from '../prisma/prisma.service'; +import { WEBHOOK_QUEUE } from './webhook-events'; +import { + WebhookDeliveryResult, + WebhookJobData, +} from './interfaces/webhook-job.interface'; + +@Injectable() +@Processor(WEBHOOK_QUEUE, { + concurrency: parseInt(process.env.QUEUE_CONCURRENCY ?? '5', 10), +}) +export class WebhooksProcessor extends WorkerHost { + private readonly logger = new Logger(WebhooksProcessor.name); + + constructor(private readonly prisma: PrismaService) { + super(); + } + + async process( + job: Job, + ): Promise { + const subscription = await this.prisma.webhookSubscription.findUnique({ + where: { id: job.data.subscriptionId }, + }); + + if (!subscription || !subscription.isActive) { + this.logger.warn( + `Skipping webhook job ${job.id} because the subscription is missing or inactive`, + ); + return { delivered: false, responseStatus: 410 }; + } + + const timestamp = new Date().toISOString(); + const body = JSON.stringify(job.data.payload); + const signature = createHmac('sha256', subscription.secret) + .update(`${timestamp}.${body}`) + .digest('hex'); + + const attempt = job.attemptsMade + 1; + + try { + const response = await axios.post(subscription.url, job.data.payload, { + timeout: 10000, + headers: { + 'content-type': 'application/json', + 'x-soter-event': job.data.event, + 'x-soter-signature': `sha256=${signature}`, + 'x-soter-timestamp': timestamp, + 'x-soter-subscription-id': subscription.id, + }, + validateStatus: () => true, + }); + + const responseBody = this.serializeResponseBody(response.data); + + if (response.status >= 400) { + await this.recordAttempt({ + subscriptionId: subscription.id, + event: job.data.event, + payload: job.data.payload, + attempt, + status: 'failed', + responseStatus: response.status, + responseBody, + errorMessage: `Remote endpoint returned HTTP ${response.status}`, + }); + + throw new Error(`Webhook delivery failed with HTTP ${response.status}`); + } + + await this.recordAttempt({ + subscriptionId: subscription.id, + event: job.data.event, + payload: job.data.payload, + attempt, + status: 'delivered', + responseStatus: response.status, + responseBody, + deliveredAt: new Date(), + }); + + return { + delivered: true, + responseStatus: response.status, + }; + } catch (error) { + const message = + error instanceof Error ? error.message : 'Unknown webhook error'; + + if (!message.startsWith('Webhook delivery failed with HTTP')) { + await this.recordAttempt({ + subscriptionId: subscription.id, + event: job.data.event, + payload: job.data.payload, + attempt, + status: 'failed', + errorMessage: message, + }); + } + + this.logger.error( + `Webhook job ${job.id} failed on attempt ${attempt}: ${message}`, + ); + + throw error; + } + } + + @OnWorkerEvent('completed') + onCompleted(job: Job) { + this.logger.log( + `Webhook job ${job.id} delivered ${job.data.event} successfully`, + ); + } + + @OnWorkerEvent('failed') + onFailed(job: Job | undefined, error: Error) { + if (!job) { + this.logger.error(`Webhook worker failure: ${error.message}`); + return; + } + + this.logger.error( + `Webhook job ${job.id} for ${job.data.event} failed: ${error.message}`, + ); + } + + private serializeResponseBody(data: unknown): string | undefined { + if (data === undefined) { + return undefined; + } + + if (typeof data === 'string') { + return data; + } + + return JSON.stringify(data); + } + + private recordAttempt(params: { + subscriptionId: string; + event: string; + payload: Record; + attempt: number; + status: string; + responseStatus?: number; + responseBody?: string; + errorMessage?: string; + deliveredAt?: Date; + }) { + return this.prisma.webhookDeliveryAttempt.create({ + data: { + ...params, + payload: params.payload as Prisma.InputJsonValue, + }, + }); + } +} diff --git a/app/backend/src/webhooks/webhooks.service.spec.ts b/app/backend/src/webhooks/webhooks.service.spec.ts new file mode 100644 index 0000000..d474d6e --- /dev/null +++ b/app/backend/src/webhooks/webhooks.service.spec.ts @@ -0,0 +1,94 @@ +import { Test } from '@nestjs/testing'; +import { getQueueToken } from '@nestjs/bullmq'; +import { NotFoundException } from '@nestjs/common'; +import { WebhooksService } from './webhooks.service'; +import { PrismaService } from '../prisma/prisma.service'; +import { WEBHOOK_QUEUE } from './webhook-events'; + +describe('WebhooksService', () => { + let service: WebhooksService; + + const prisma = { + webhookSubscription: { + create: jest.fn(), + findMany: jest.fn(), + findFirst: jest.fn(), + update: jest.fn(), + delete: jest.fn(), + }, + }; + + const queue = { + add: jest.fn(), + }; + + beforeEach(async () => { + jest.clearAllMocks(); + + const moduleRef = await Test.createTestingModule({ + providers: [ + WebhooksService, + { provide: PrismaService, useValue: prisma }, + { provide: getQueueToken(WEBHOOK_QUEUE), useValue: queue }, + ], + }).compile(); + + service = moduleRef.get(WebhooksService); + }); + + it('creates a subscription for the authenticated NGO api key', async () => { + prisma.webhookSubscription.create.mockResolvedValue({ id: 'sub-1' }); + + await service.createSubscription('api-key-1', { + url: 'https://ngo.example.com/hooks', + secret: 'supersecret', + events: ['claim.verified'], + isActive: true, + }); + + expect(prisma.webhookSubscription.create).toHaveBeenCalledWith({ + data: { + apiKeyId: 'api-key-1', + url: 'https://ngo.example.com/hooks', + secret: 'supersecret', + events: ['claim.verified'], + isActive: true, + }, + }); + }); + + it('enqueues deliveries for all matching active subscriptions', async () => { + prisma.webhookSubscription.findMany.mockResolvedValue([ + { id: 'sub-1' }, + { id: 'sub-2' }, + ]); + + const count = await service.enqueueEvent('claim.verified', { + event: 'claim.verified', + claim: { id: 'claim-1' }, + }); + + expect(count).toBe(2); + expect(queue.add).toHaveBeenCalledTimes(2); + expect(queue.add).toHaveBeenCalledWith( + 'deliver-claim.verified', + expect.objectContaining({ + subscriptionId: 'sub-1', + event: 'claim.verified', + }), + expect.objectContaining({ + attempts: 5, + }), + ); + }); + + it('throws when updating a subscription the NGO does not own', async () => { + prisma.webhookSubscription.findFirst.mockResolvedValue(null); + + await expect( + service.updateSubscription('api-key-1', 'sub-1', { + isActive: false, + }), + ).rejects.toBeInstanceOf(NotFoundException); + }); +}); diff --git a/app/backend/src/webhooks/webhooks.service.ts b/app/backend/src/webhooks/webhooks.service.ts new file mode 100644 index 0000000..8f8f4d9 --- /dev/null +++ b/app/backend/src/webhooks/webhooks.service.ts @@ -0,0 +1,126 @@ +import { Injectable, Logger, NotFoundException } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bullmq'; +import { PrismaService } from '../prisma/prisma.service'; +import { CreateWebhookSubscriptionDto } from './dto/create-webhook-subscription.dto'; +import { UpdateWebhookSubscriptionDto } from './dto/update-webhook-subscription.dto'; +import { WEBHOOK_QUEUE, WebhookEvent } from './webhook-events'; +import { Queue } from 'bullmq'; +import { WebhookJobData } from './interfaces/webhook-job.interface'; + +@Injectable() +export class WebhooksService { + private readonly logger = new Logger(WebhooksService.name); + + constructor( + private readonly prisma: PrismaService, + @InjectQueue(WEBHOOK_QUEUE) + private readonly webhookQueue: Queue, + ) {} + + async createSubscription( + apiKeyId: string, + dto: CreateWebhookSubscriptionDto, + ) { + return this.prisma.webhookSubscription.create({ + data: { + apiKeyId, + url: dto.url, + secret: dto.secret, + events: dto.events, + isActive: dto.isActive ?? true, + }, + }); + } + + async listSubscriptions(apiKeyId: string) { + return this.prisma.webhookSubscription.findMany({ + where: { apiKeyId }, + orderBy: { createdAt: 'desc' }, + }); + } + + async updateSubscription( + apiKeyId: string, + id: string, + dto: UpdateWebhookSubscriptionDto, + ) { + await this.ensureOwnedSubscription(apiKeyId, id); + + return this.prisma.webhookSubscription.update({ + where: { id }, + data: { + url: dto.url, + secret: dto.secret, + events: dto.events, + isActive: dto.isActive, + }, + }); + } + + async deleteSubscription(apiKeyId: string, id: string) { + await this.ensureOwnedSubscription(apiKeyId, id); + await this.prisma.webhookSubscription.delete({ where: { id } }); + + return { deleted: true }; + } + + async enqueueEvent( + event: WebhookEvent, + payload: Record, + ): Promise { + const subscriptions = await this.prisma.webhookSubscription.findMany({ + where: { + isActive: true, + events: { + has: event, + }, + }, + select: { id: true }, + }); + + if (subscriptions.length === 0) { + this.logger.debug(`No webhook subscriptions registered for ${event}`); + return 0; + } + + await Promise.all( + subscriptions.map(subscription => + this.webhookQueue.add( + `deliver-${event}`, + { + subscriptionId: subscription.id, + event, + payload, + }, + { + attempts: 5, + backoff: { + type: 'exponential', + delay: 5000, + }, + removeOnComplete: 100, + removeOnFail: 100, + }, + ), + ), + ); + + this.logger.log( + `Enqueued ${subscriptions.length} webhook deliveries for ${event}`, + ); + + return subscriptions.length; + } + + private async ensureOwnedSubscription(apiKeyId: string, id: string) { + const subscription = await this.prisma.webhookSubscription.findFirst({ + where: { id, apiKeyId }, + }); + + if (!subscription) { + throw new NotFoundException('Webhook subscription not found'); + } + + return subscription; + } +}