From 73ebca3d0cef0776530c527f7982bd4722ebddea Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Wed, 10 Jun 2026 11:24:47 -0300 Subject: [PATCH 1/7] feat(broker): add getTopicLag for consumer lag observability (EVO-1217) Kafka sums high-watermark minus committed offset across partitions for the mode's consumer group; RabbitMQ reads the ready count of the mode's queue. Feeds the consumer_lag gauge required by NFR33. Co-Authored-By: Claude Fable 5 --- .../broker/adapters/kafka-broker.adapter.ts | 29 +++++++++++++++++++ .../adapters/rabbitmq-broker.adapter.ts | 7 +++++ .../interfaces/message-broker.interface.ts | 8 +++++ 3 files changed, 44 insertions(+) diff --git a/src/shared/broker/adapters/kafka-broker.adapter.ts b/src/shared/broker/adapters/kafka-broker.adapter.ts index 13dd658..bd26bc6 100644 --- a/src/shared/broker/adapters/kafka-broker.adapter.ts +++ b/src/shared/broker/adapters/kafka-broker.adapter.ts @@ -204,6 +204,35 @@ export class KafkaBrokerAdapter await this.ensureTopicExists(topic); } + async getTopicLag(topic: string): Promise { + this.assertActive('getTopicLag'); + + const groupId = `${this.resolveRunMode(topic)}-${topic}`; + const [latest, committed] = await Promise.all([ + this.admin!.fetchTopicOffsets(topic), + this.admin!.fetchOffsets({ groupId, topics: [topic] }), + ]); + + const committedByPartition = new Map(); + for (const entry of committed) { + for (const partition of entry.partitions) { + committedByPartition.set(partition.partition, partition.offset); + } + } + + let lag = 0; + for (const partition of latest) { + const committedOffset = committedByPartition.get(partition.partition); + // '-1' means the group never committed on this partition; with + // fromBeginning=false the consumer starts at latest, so lag is 0. + if (committedOffset === undefined || Number(committedOffset) < 0) { + continue; + } + lag += Math.max(Number(partition.offset) - Number(committedOffset), 0); + } + return lag; + } + async subscribePattern( prefix: string, handler: (msg: BrokerMessage) => Promise, diff --git a/src/shared/broker/adapters/rabbitmq-broker.adapter.ts b/src/shared/broker/adapters/rabbitmq-broker.adapter.ts index ace83fa..50317b2 100644 --- a/src/shared/broker/adapters/rabbitmq-broker.adapter.ts +++ b/src/shared/broker/adapters/rabbitmq-broker.adapter.ts @@ -228,6 +228,13 @@ export class RabbitMQBrokerAdapter await this.channel!.assertQueue(topic, { durable: true }); } + async getTopicLag(topic: string): Promise { + this.assertActive('getTopicLag'); + const queueName = `${this.resolveRunMode(topic)}-${topic}`; + const { messageCount } = await this.channel!.checkQueue(queueName); + return messageCount; + } + async subscribePattern( prefix: string, handler: (msg: BrokerMessage) => Promise, diff --git a/src/shared/broker/interfaces/message-broker.interface.ts b/src/shared/broker/interfaces/message-broker.interface.ts index 31837f4..be6f6f9 100644 --- a/src/shared/broker/interfaces/message-broker.interface.ts +++ b/src/shared/broker/interfaces/message-broker.interface.ts @@ -36,6 +36,14 @@ export interface IMessageBroker { * for explicit deploy-time provisioning (EVO-1200). Safe to call repeatedly. */ provisionTopic(topic: string): Promise; + /** + * Best-effort consumer lag for this process's consumer group/queue on + * `topic` (Kafka: sum of partition high-watermark minus committed offset; + * RabbitMQ: ready message count of the `${RUN_MODE}-${topic}` queue). Feeds + * the `consumer_lag` gauge (NFR33); callers must tolerate a rejection and + * never let a failed poll disturb message processing. + */ + getTopicLag(topic: string): Promise; } export const IMESSAGE_BROKER: unique symbol = Symbol('IMessageBroker'); From e1f53b5fcf99ef355b5794b25b53b59127c9c6f2 Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Wed, 10 Jun 2026 11:24:47 -0300 Subject: [PATCH 2/7] feat(campaign-sender): batch dispatcher with template rendering via CRM inbox (EVO-1217) Loads the batch's MessageTemplate once (missing template is terminal), renders legacy-compatible placeholders per contact and delegates delivery to the shared CrmInboxDispatcher (story 2.2). Co-Authored-By: Claude Fable 5 --- .../errors/campaign-not-configured.error.ts | 17 +++ .../errors/campaign-not-found.error.ts | 15 +++ .../services/batch-dispatcher.service.spec.ts | 120 ++++++++++++++++++ .../services/batch-dispatcher.service.ts | 100 +++++++++++++++ 4 files changed, 252 insertions(+) create mode 100644 src/runners/campaign-sender/errors/campaign-not-configured.error.ts create mode 100644 src/runners/campaign-sender/errors/campaign-not-found.error.ts create mode 100644 src/runners/campaign-sender/services/batch-dispatcher.service.spec.ts create mode 100644 src/runners/campaign-sender/services/batch-dispatcher.service.ts diff --git a/src/runners/campaign-sender/errors/campaign-not-configured.error.ts b/src/runners/campaign-sender/errors/campaign-not-configured.error.ts new file mode 100644 index 0000000..443c9b2 --- /dev/null +++ b/src/runners/campaign-sender/errors/campaign-not-configured.error.ts @@ -0,0 +1,17 @@ +import { TerminalError } from '../../../shared/errors/terminal-error'; + +/** + * Terminal error for a `campaigns.send` message that can never be dispatched + * because the campaign is missing required configuration (no inboxId, message + * template deleted). Redelivery cannot fix it, so the consumer drops the + * message to the DLQ; contacts stay PENDING for a manual re-publish once the + * configuration is repaired. + */ +export class CampaignNotConfiguredError extends TerminalError { + readonly campaignId: string; + + constructor(campaignId: string, reason: string) { + super(`Campaign ${campaignId} is not dispatchable: ${reason}`); + this.campaignId = campaignId; + } +} diff --git a/src/runners/campaign-sender/errors/campaign-not-found.error.ts b/src/runners/campaign-sender/errors/campaign-not-found.error.ts new file mode 100644 index 0000000..67439cc --- /dev/null +++ b/src/runners/campaign-sender/errors/campaign-not-found.error.ts @@ -0,0 +1,15 @@ +import { TerminalError } from '../../../shared/errors/terminal-error'; + +/** + * Terminal error for a `campaigns.send` message whose `campaignId` does not + * resolve to a Campaign row. The consumer maps it to `nack(requeue=false)` — + * requeueing would loop forever since the campaign will never appear. + */ +export class CampaignNotFoundError extends TerminalError { + readonly campaignId: string; + + constructor(campaignId: string) { + super(`Campaign ${campaignId} not found`); + this.campaignId = campaignId; + } +} diff --git a/src/runners/campaign-sender/services/batch-dispatcher.service.spec.ts b/src/runners/campaign-sender/services/batch-dispatcher.service.spec.ts new file mode 100644 index 0000000..cc0c9e7 --- /dev/null +++ b/src/runners/campaign-sender/services/batch-dispatcher.service.spec.ts @@ -0,0 +1,120 @@ +import { BatchDispatcherService } from './batch-dispatcher.service'; +import { CampaignNotConfiguredError } from '../errors/campaign-not-configured.error'; +import { MessageTemplate } from '../../../shared/entities/message-template.entity'; +import type { HydratedContact } from '../../../shared/crm-client/types/contact'; + +const template = { + id: 'tpl-1', + name: 'welcome', + content: 'Hi {contact.name}, your plan is {{contact.plan}}', + language: 'pt_BR', + category: 'marketing', + variables: [{ key: 'plan' }], +} as unknown as MessageTemplate; + +const contact: HydratedContact = { + id: 'contact-1', + name: 'Ana', + email: 'ana@example.com', + phoneNumber: '+5511999999999', + blocked: false, + customAttributes: { plan: 'pro' }, + additionalAttributes: {}, +}; + +describe('BatchDispatcherService', () => { + let service: BatchDispatcherService; + let findOne: jest.Mock; + let dispatch: jest.Mock; + + beforeEach(() => { + findOne = jest.fn(); + dispatch = jest.fn(); + const db = { getRepository: () => ({ findOne }) }; + service = new BatchDispatcherService(db as any, { dispatch } as any); + }); + + describe('loadTemplate', () => { + it('returns the template when it exists', async () => { + findOne.mockResolvedValueOnce(template); + + await expect(service.loadTemplate('camp-1', 'tpl-1')).resolves.toBe( + template, + ); + expect(findOne).toHaveBeenCalledWith({ where: { id: 'tpl-1' } }); + }); + + it('throws a terminal CampaignNotConfiguredError when missing', async () => { + findOne.mockResolvedValueOnce(null); + + await expect(service.loadTemplate('camp-1', 'tpl-x')).rejects.toThrow( + CampaignNotConfiguredError, + ); + }); + }); + + describe('dispatch', () => { + it('delegates to CrmInboxDispatcher with rendered content and template params', async () => { + dispatch.mockResolvedValueOnce({ success: true, latencyMs: 10 }); + + await service.dispatch({ + campaignId: 'camp-1', + inboxId: 'inbox-1', + template, + contact, + }); + + expect(dispatch).toHaveBeenCalledWith({ + contactId: 'contact-1', + inboxId: 'inbox-1', + content: 'Hi Ana, your plan is pro', + campaignId: 'camp-1', + templateParams: { + name: 'welcome', + category: 'marketing', + language: 'pt_BR', + processed_params: [{ key: 'plan' }], + }, + }); + }); + + it('returns the dispatcher result untouched', async () => { + const result = { + success: false, + statusCode: 422, + error: { code: '422', message: 'invalid contact' }, + latencyMs: 5, + }; + dispatch.mockResolvedValueOnce(result); + + await expect( + service.dispatch({ + campaignId: 'camp-1', + inboxId: 'inbox-1', + template, + contact, + }), + ).resolves.toBe(result); + }); + + it('renders empty string for missing contact fields and attributes', async () => { + dispatch.mockResolvedValueOnce({ success: true, latencyMs: 1 }); + const sparse: HydratedContact = { + id: 'contact-2', + name: '', + blocked: false, + customAttributes: { plan: null }, + additionalAttributes: {}, + }; + + await service.dispatch({ + campaignId: 'camp-1', + inboxId: 'inbox-1', + template, + contact: sparse, + }); + + expect(dispatch.mock.calls[0][0].content).toBe('Hi , your plan is '); + }); + }); +}); diff --git a/src/runners/campaign-sender/services/batch-dispatcher.service.ts b/src/runners/campaign-sender/services/batch-dispatcher.service.ts new file mode 100644 index 0000000..c3ce295 --- /dev/null +++ b/src/runners/campaign-sender/services/batch-dispatcher.service.ts @@ -0,0 +1,100 @@ +import { Injectable } from '@nestjs/common'; +import { Repository } from 'typeorm'; +import { MessageTemplate } from '../../../shared/entities/message-template.entity'; +import { TenantDbContext } from '../../../evo-extension-points'; +import { CrmInboxDispatcher } from '../../../shared/messaging-channels/dispatchers/crm-inbox.dispatcher'; +import type { DispatchResult } from '../../../shared/messaging-channels/interfaces/channel-dispatcher.interface'; +import type { HydratedContact } from '../../../shared/crm-client/types/contact'; +import { CampaignNotConfiguredError } from '../errors/campaign-not-configured.error'; + +export interface BatchDispatchInput { + campaignId: string; + inboxId: string; + template: MessageTemplate; + contact: HydratedContact; +} + +/** + * Batch-scoped dispatch helper for the campaign-sender (story 4.3 / EVO-1217): + * loads the batch's MessageTemplate once, renders per-contact content and + * delegates HTTP delivery to the shared CrmInboxDispatcher (story 2.2). The + * dispatch path is channel-agnostic — the channel is carried by `inboxId`. + * + * Content rendering mirrors the legacy CampaignMessageSenderService + * placeholder semantics ({contact.name} / {{contact.name}} / custom + * attributes); the legacy path is removed by story 5.5. + */ +@Injectable() +export class BatchDispatcherService { + constructor( + private readonly db: TenantDbContext, + private readonly crmInboxDispatcher: CrmInboxDispatcher, + ) {} + + private get messageTemplateRepository(): Repository { + return this.db.getRepository(MessageTemplate); + } + + /** + * Load the batch's template once. A missing template is terminal: every + * contact in the batch would fail identically, so the whole message is + * dropped to the DLQ instead of burning one FAILED row per contact. + */ + async loadTemplate( + campaignId: string, + templateId: string, + ): Promise { + const template = await this.messageTemplateRepository.findOne({ + where: { id: templateId }, + }); + if (!template) { + throw new CampaignNotConfiguredError( + campaignId, + `message template ${templateId} not found`, + ); + } + return template; + } + + async dispatch(input: BatchDispatchInput): Promise { + const { campaignId, inboxId, template, contact } = input; + return this.crmInboxDispatcher.dispatch({ + contactId: contact.id, + inboxId, + content: this.renderContent(template.content, contact), + campaignId, + templateParams: { + name: template.name, + category: template.category || undefined, + language: template.language || 'pt_BR', + // Entity types `variables` as jsonb array; the legacy sender forwards + // it verbatim as processed_params, so preserve the wire shape. + processed_params: (template.variables ?? {}) as unknown as Record< + string, + unknown + >, + }, + }); + } + + private renderContent(content: string, contact: HydratedContact): string { + const values: Record = { + 'contact.name': contact.name || '', + 'contact.email': contact.email || '', + 'contact.phone': contact.phoneNumber || '', + }; + for (const [key, value] of Object.entries(contact.customAttributes ?? {})) { + values[`contact.${key}`] = + value === null || value === undefined ? '' : String(value); + } + + let rendered = content; + for (const [key, value] of Object.entries(values)) { + // Double-brace form first, otherwise `{key}` would eat the inner braces + // of `{{key}}` and leave a stray `{...}` around the value. + rendered = rendered.replaceAll(`{{${key}}}`, value); + rendered = rendered.replaceAll(`{${key}}`, value); + } + return rendered; + } +} From 825990309f5eb2f7fdc2a182e5da85057d9cc7b0 Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Wed, 10 Jun 2026 11:24:47 -0300 Subject: [PATCH 3/7] feat(campaign-sender): sender service with tabular idempotency and pause recheck (EVO-1217) Processes one campaigns.send page: skips non-PENDING contacts (FR30/NFR16), rechecks Campaign.status before every dispatch through a 5s TTL cache (FR21-FR24) and records SENT/FAILED via updates conditional on PENDING so replica races never double-mark. 4xx/5xx fail immediately - retry is 4.5. Co-Authored-By: Claude Fable 5 --- .../services/campaign-sender.service.spec.ts | 309 ++++++++++++++++ .../services/campaign-sender.service.ts | 334 ++++++++++++++++++ 2 files changed, 643 insertions(+) create mode 100644 src/runners/campaign-sender/services/campaign-sender.service.spec.ts create mode 100644 src/runners/campaign-sender/services/campaign-sender.service.ts diff --git a/src/runners/campaign-sender/services/campaign-sender.service.spec.ts b/src/runners/campaign-sender/services/campaign-sender.service.spec.ts new file mode 100644 index 0000000..7d8cdd1 --- /dev/null +++ b/src/runners/campaign-sender/services/campaign-sender.service.spec.ts @@ -0,0 +1,309 @@ +import { CampaignSenderService } from './campaign-sender.service'; +import { CampaignNotFoundError } from '../errors/campaign-not-found.error'; +import { CampaignNotConfiguredError } from '../errors/campaign-not-configured.error'; +import { + Campaign, + CampaignStatus, +} from '../../../modules/campaigns/entities/campaign.entity'; +import { + CampaignContact, + CampaignContactStatus, +} from '../../../modules/campaigns/entities/campaign-contact.entity'; +import type { CampaignsSendContract } from '../../../shared/broker/contracts/campaigns-send.contract'; +import type { ContactDto } from '../../../shared/crm-client/types/contact'; + +const CAMPAIGN_ID = 'camp-1'; + +const payload = (contactIds: [string, ...string[]]): CampaignsSendContract => ({ + campaignId: CAMPAIGN_ID, + page: 1, + totalPages: 1, + contactIds, + templateId: 'tpl-1', + channelType: 'whatsapp', + correlationId: '11111111-1111-4111-8111-111111111111', +}); + +const campaign = (status = CampaignStatus.SENDING): Campaign => + ({ id: CAMPAIGN_ID, status, inboxId: 'inbox-1' }) as Campaign; + +const row = ( + contactId: string, + status = CampaignContactStatus.PENDING, +): CampaignContact => + ({ + id: `cc-${contactId}`, + campaignId: CAMPAIGN_ID, + contactId, + status, + }) as CampaignContact; + +const dto = (id: string, blocked = false): ContactDto => + ({ id, name: `Contact ${id}`, blocked }) as unknown as ContactDto; + +describe('CampaignSenderService', () => { + let service: CampaignSenderService; + let campaignFindOne: jest.Mock; + let contactFind: jest.Mock; + let contactUpdate: jest.Mock; + let findByIds: jest.Mock; + let findById: jest.Mock; + let loadTemplate: jest.Mock; + let dispatch: jest.Mock; + let logger: { log: jest.Mock; warn: jest.Mock; error: jest.Mock }; + let metrics: { incError: jest.Mock; incThroughput: jest.Mock }; + + const template = { id: 'tpl-1', name: 'welcome' }; + + beforeEach(() => { + campaignFindOne = jest.fn(); + contactFind = jest.fn(); + contactUpdate = jest.fn().mockResolvedValue({ affected: 1 }); + findByIds = jest.fn().mockResolvedValue([]); + findById = jest.fn().mockResolvedValue(null); + loadTemplate = jest.fn().mockResolvedValue(template); + dispatch = jest.fn().mockResolvedValue({ success: true, latencyMs: 5 }); + logger = { log: jest.fn(), warn: jest.fn(), error: jest.fn() }; + metrics = { incError: jest.fn(), incThroughput: jest.fn() }; + + const db = { + getRepository: (entity: unknown) => + entity === Campaign + ? { findOne: campaignFindOne } + : { find: contactFind, update: contactUpdate }, + }; + + service = new CampaignSenderService( + db as any, + { findByIds, findById } as any, + logger as any, + metrics as any, + { loadTemplate, dispatch } as any, + ); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + it('AC1: dispatches every PENDING contact and marks each SENT with sentAt', async () => { + const ids: [string, ...string[]] = ['c1', 'c2', 'c3']; + campaignFindOne.mockResolvedValue(campaign()); + contactFind.mockResolvedValue(ids.map((id) => row(id))); + findByIds.mockResolvedValue(ids.map((id) => dto(id))); + + const result = await service.send(payload(ids)); + + expect(dispatch).toHaveBeenCalledTimes(3); + expect(contactUpdate).toHaveBeenCalledTimes(3); + for (const id of ids) { + expect(contactUpdate).toHaveBeenCalledWith( + { id: `cc-${id}`, status: CampaignContactStatus.PENDING }, + { + status: CampaignContactStatus.SENT, + sentAt: expect.any(Date), + }, + ); + } + expect(metrics.incThroughput).toHaveBeenCalledTimes(3); + expect(result).toEqual({ + dispatched: 3, + skipped: 0, + failed: 0, + aborted: false, + }); + }); + + it('AC2: skips an already-SENT contact with the canonical log line', async () => { + campaignFindOne.mockResolvedValue(campaign()); + contactFind.mockResolvedValue([ + row('c1', CampaignContactStatus.SENT), + row('c2'), + ]); + findByIds.mockResolvedValue([dto('c1'), dto('c2')]); + + const result = await service.send(payload(['c1', 'c2'])); + + expect(dispatch).toHaveBeenCalledTimes(1); + expect(logger.log).toHaveBeenCalledWith( + 'skipped: already sent', + expect.objectContaining({ contactId: 'c1' }), + ); + expect(result.skipped).toBe(1); + expect(result.dispatched).toBe(1); + }); + + it('AC3: aborts mid-batch when the campaign flips to Paused after the status cache expires', async () => { + campaignFindOne + .mockResolvedValueOnce(campaign(CampaignStatus.SENDING)) + .mockResolvedValue({ id: CAMPAIGN_ID, status: CampaignStatus.PAUSED }); + contactFind.mockResolvedValue([row('c1'), row('c2')]); + findByIds.mockResolvedValue([dto('c1'), dto('c2')]); + + // First contact reads the warm cache; expire it before the second. + let now = 1_000_000; + jest.spyOn(Date, 'now').mockImplementation(() => now); + dispatch.mockImplementation(() => { + now += 6_000; + return Promise.resolve({ success: true, latencyMs: 5 }); + }); + + const result = await service.send(payload(['c1', 'c2'])); + + expect(dispatch).toHaveBeenCalledTimes(1); + expect(logger.warn).toHaveBeenCalledWith( + 'aborted: campaign paused', + expect.objectContaining({ campaignId: CAMPAIGN_ID }), + ); + expect(result).toEqual({ + dispatched: 1, + skipped: 0, + failed: 0, + aborted: true, + }); + }); + + it('AC4: marks the contact FAILED on a 4xx dispatch and logs the reason', async () => { + campaignFindOne.mockResolvedValue(campaign()); + contactFind.mockResolvedValue([row('c1')]); + findByIds.mockResolvedValue([dto('c1')]); + dispatch.mockResolvedValue({ + success: false, + statusCode: 422, + error: { code: '422', message: 'CRM API error: 422 - invalid' }, + latencyMs: 5, + }); + + const result = await service.send(payload(['c1'])); + + expect(contactUpdate).toHaveBeenCalledWith( + { id: 'cc-c1', status: CampaignContactStatus.PENDING }, + { status: CampaignContactStatus.FAILED }, + ); + expect(logger.error).toHaveBeenCalledWith( + 'dispatch failed', + expect.objectContaining({ + contactId: 'c1', + statusCode: 422, + reason: 'CRM API error: 422 - invalid', + }), + ); + expect(metrics.incError).toHaveBeenCalledWith('dispatch_4xx'); + expect(result.failed).toBe(1); + }); + + it('marks FAILED with dispatch_5xx category on a server error (no retry in this story)', async () => { + campaignFindOne.mockResolvedValue(campaign()); + contactFind.mockResolvedValue([row('c1')]); + findByIds.mockResolvedValue([dto('c1')]); + dispatch.mockResolvedValue({ + success: false, + statusCode: 503, + error: { code: '503', message: 'CRM API error: 503' }, + latencyMs: 5, + }); + + const result = await service.send(payload(['c1'])); + + expect(metrics.incError).toHaveBeenCalledWith('dispatch_5xx'); + expect(result.failed).toBe(1); + }); + + it('throws terminal CampaignNotFoundError when the campaign does not exist', async () => { + campaignFindOne.mockResolvedValue(null); + + await expect(service.send(payload(['c1']))).rejects.toThrow( + CampaignNotFoundError, + ); + }); + + it('throws terminal CampaignNotConfiguredError when the campaign has no inbox', async () => { + campaignFindOne.mockResolvedValue({ + id: CAMPAIGN_ID, + status: CampaignStatus.SENDING, + inboxId: null, + }); + + await expect(service.send(payload(['c1']))).rejects.toThrow( + CampaignNotConfiguredError, + ); + }); + + it('aborts upfront without loading the template when the campaign is already Paused', async () => { + campaignFindOne.mockResolvedValue(campaign(CampaignStatus.PAUSED)); + + const result = await service.send(payload(['c1'])); + + expect(result.aborted).toBe(true); + expect(loadTemplate).not.toHaveBeenCalled(); + expect(dispatch).not.toHaveBeenCalled(); + }); + + it('fails a contact that the CRM confirms missing (404 on direct lookup)', async () => { + campaignFindOne.mockResolvedValue(campaign()); + contactFind.mockResolvedValue([row('c1')]); + findByIds.mockResolvedValue([]); + findById.mockResolvedValue(null); + + const result = await service.send(payload(['c1'])); + + expect(findById).toHaveBeenCalledWith('c1'); + expect(metrics.incError).toHaveBeenCalledWith('contact_not_found'); + expect(result.failed).toBe(1); + expect(dispatch).not.toHaveBeenCalled(); + }); + + it('propagates a transient CRM error on direct lookup so the batch requeues', async () => { + campaignFindOne.mockResolvedValue(campaign()); + contactFind.mockResolvedValue([row('c1')]); + findByIds.mockResolvedValue([]); + findById.mockRejectedValue(new Error('CRM unavailable')); + + await expect(service.send(payload(['c1']))).rejects.toThrow( + 'CRM unavailable', + ); + expect(contactUpdate).not.toHaveBeenCalled(); + }); + + it('skips a blocked contact as SKIPPED without dispatching', async () => { + campaignFindOne.mockResolvedValue(campaign()); + contactFind.mockResolvedValue([row('c1')]); + findByIds.mockResolvedValue([dto('c1', true)]); + + const result = await service.send(payload(['c1'])); + + expect(dispatch).not.toHaveBeenCalled(); + expect(contactUpdate).toHaveBeenCalledWith( + { id: 'cc-c1', status: CampaignContactStatus.PENDING }, + { status: CampaignContactStatus.SKIPPED }, + ); + expect(result.skipped).toBe(1); + }); + + it('counts a lost claim race as skipped when the conditional SENT update hits 0 rows', async () => { + campaignFindOne.mockResolvedValue(campaign()); + contactFind.mockResolvedValue([row('c1')]); + findByIds.mockResolvedValue([dto('c1')]); + contactUpdate.mockResolvedValue({ affected: 0 }); + + const result = await service.send(payload(['c1'])); + + expect(result.dispatched).toBe(0); + expect(result.skipped).toBe(1); + expect(logger.warn).toHaveBeenCalledWith( + 'skipped: already sent (lost claim race)', + expect.objectContaining({ contactId: 'c1' }), + ); + }); + + it('skips ids without a campaign_contact row', async () => { + campaignFindOne.mockResolvedValue(campaign()); + contactFind.mockResolvedValue([]); + findByIds.mockResolvedValue([dto('c1')]); + + const result = await service.send(payload(['c1'])); + + expect(dispatch).not.toHaveBeenCalled(); + expect(result.skipped).toBe(1); + }); +}); diff --git a/src/runners/campaign-sender/services/campaign-sender.service.ts b/src/runners/campaign-sender/services/campaign-sender.service.ts new file mode 100644 index 0000000..e7cbba1 --- /dev/null +++ b/src/runners/campaign-sender/services/campaign-sender.service.ts @@ -0,0 +1,334 @@ +import { Injectable } from '@nestjs/common'; +import { In, Repository } from 'typeorm'; +import { + Campaign, + CampaignStatus, +} from '../../../modules/campaigns/entities/campaign.entity'; +import { + CampaignContact, + CampaignContactStatus, +} from '../../../modules/campaigns/entities/campaign-contact.entity'; +import { TenantDbContext } from '../../../evo-extension-points'; +import { CustomLoggerService } from '../../../common/services/custom-logger.service'; +import { ContactsClientService } from '../../../shared/crm-client/contacts-client.service'; +import { + mapContactDto, + type HydratedContact, +} from '../../../shared/crm-client/types/contact'; +import { PipelineMetricsService } from '../../../shared/metrics/pipeline-metrics.service'; +import type { CampaignsSendContract } from '../../../shared/broker/contracts/campaigns-send.contract'; +import { BatchDispatcherService } from './batch-dispatcher.service'; +import { CampaignNotFoundError } from '../errors/campaign-not-found.error'; +import { CampaignNotConfiguredError } from '../errors/campaign-not-configured.error'; + +export interface SendResult { + dispatched: number; + skipped: number; + failed: number; + aborted: boolean; +} + +const DEFAULT_STATUS_CACHE_TTL_MS = 5_000; + +const DISPATCHABLE_STATUSES = new Set([ + CampaignStatus.SENDING, + CampaignStatus.SENDING_TESTAB, +]); + +/** + * Consumer-side core of the campaign dispatch pipeline (story 4.3 / EVO-1217): + * processes one `campaigns.send` page — hydrates the batch's contacts from the + * CRM, dispatches each PENDING contact through the shared CRM inbox dispatcher + * and records the outcome on `CampaignContact.status` (FR5). + * + * Idempotency is tabular (FR30, NFR16): `status` is the lock. Non-PENDING + * contacts are skipped, and the SENT/FAILED updates are conditional on + * `status='PENDING'` so a redelivery or replica race never double-marks a row. + * + * Pause/stop (FR21–FR24) is honored by rechecking `Campaign.status` before + * every dispatch through a per-instance TTL cache — eventually consistent by + * design (NFR5 allows ≤30s propagation), trading staleness for not hammering + * Postgres once per contact. + * + * Out of scope here by contract: rate limiting (4.4), retry with backoff (4.5, + * a failed dispatch is FAILED right away), `campaigns.tracked` publishing (4.6) + * and `campaigns.control` consumption (4.8). + */ +@Injectable() +export class CampaignSenderService { + private readonly statusCache = new Map< + string, + { status: CampaignStatus; fetchedAt: number } + >(); + + constructor( + private readonly db: TenantDbContext, + private readonly contactsClient: ContactsClientService, + private readonly logger: CustomLoggerService, + private readonly metrics: PipelineMetricsService, + private readonly batchDispatcher: BatchDispatcherService, + ) {} + + private get campaignRepository(): Repository { + return this.db.getRepository(Campaign); + } + + private get campaignContactRepository(): Repository { + return this.db.getRepository(CampaignContact); + } + + async send(payload: CampaignsSendContract): Promise { + const { campaignId, page, totalPages } = payload; + const result: SendResult = { + dispatched: 0, + skipped: 0, + failed: 0, + aborted: false, + }; + + const campaign = await this.campaignRepository.findOne({ + where: { id: campaignId }, + }); + if (!campaign) { + throw new CampaignNotFoundError(campaignId); + } + if (!campaign.inboxId) { + throw new CampaignNotConfiguredError( + campaignId, + 'campaign has no inboxId', + ); + } + + this.cacheStatus(campaignId, campaign.status); + if (!DISPATCHABLE_STATUSES.has(campaign.status)) { + result.aborted = true; + this.logAborted(campaign.status, { campaignId, page }); + return result; + } + + const template = await this.batchDispatcher.loadTemplate( + campaignId, + payload.templateId, + ); + + const rows = await this.campaignContactRepository.find({ + where: { campaignId, contactId: In(payload.contactIds) }, + }); + const rowByContactId = new Map(rows.map((row) => [row.contactId, row])); + const contacts = await this.hydrateContacts(payload.contactIds); + + for (const contactId of payload.contactIds) { + const row = rowByContactId.get(contactId); + if (!row) { + result.skipped++; + this.logger.warn('skipped: no campaign_contact row', { + campaignId, + contactId, + }); + continue; + } + + if (row.status !== CampaignContactStatus.PENDING) { + result.skipped++; + this.logger.log('skipped: already sent', { + campaignId, + contactId, + status: row.status, + }); + continue; + } + + const currentStatus = await this.currentCampaignStatus(campaignId); + if (!DISPATCHABLE_STATUSES.has(currentStatus)) { + result.aborted = true; + this.logAborted(currentStatus, { campaignId, page }); + break; + } + + const contact = await this.resolveContact(contacts, contactId); + if (!contact) { + await this.markFailed(row, 'contact_not_found'); + this.metrics.incError('contact_not_found'); + result.failed++; + continue; + } + + if (contact.blocked) { + await this.markSkipped(row); + result.skipped++; + this.logger.log('skipped: contact blocked', { campaignId, contactId }); + continue; + } + + const dispatch = await this.batchDispatcher.dispatch({ + campaignId, + inboxId: campaign.inboxId, + template, + contact, + }); + + if (dispatch.success) { + const claimed = await this.markSent(row); + if (claimed) { + result.dispatched++; + this.metrics.incThroughput(); + } else { + // Another replica claimed the row between our read and the update. + result.skipped++; + this.logger.warn('skipped: already sent (lost claim race)', { + campaignId, + contactId, + }); + } + } else { + await this.markFailed( + row, + dispatch.error?.message ?? 'unknown dispatch error', + dispatch.statusCode, + ); + this.metrics.incError(this.dispatchErrorCategory(dispatch.statusCode)); + result.failed++; + } + } + + this.logger.log('campaign.batch.processed', { + campaignId, + page, + totalPages, + ...result, + }); + return result; + } + + /** + * Hydrate the batch upfront with the pooled `findByIds` (10 concurrent, LRU + * cached). It swallows per-contact transport errors as "missing", so a + * missing id alone cannot distinguish a real 404 from a CRM outage — + * `resolveContact` re-checks those before they are marked FAILED. + */ + private async hydrateContacts( + contactIds: string[], + ): Promise> { + const dtos = await this.contactsClient.findByIds(contactIds); + const contacts = new Map(); + for (const dto of dtos) { + const contact = mapContactDto(dto); + if (contact) contacts.set(contact.id, contact); + } + return contacts; + } + + /** + * A contact absent from the bulk hydration gets one direct lookup before + * being failed: `findById` returns null on a genuine 404 (→ FAILED is + * correct) but THROWS on a transient CRM error, which propagates to the + * consumer as a requeue — so a CRM outage redelivers the batch instead of + * mass-failing contacts that still exist. + */ + private async resolveContact( + contacts: Map, + contactId: string, + ): Promise { + const cached = contacts.get(contactId); + if (cached) return cached; + return mapContactDto(await this.contactsClient.findById(contactId)); + } + + private async currentCampaignStatus( + campaignId: string, + ): Promise { + const cached = this.statusCache.get(campaignId); + if (cached && Date.now() - cached.fetchedAt < this.statusCacheTtlMs()) { + return cached.status; + } + const row = await this.campaignRepository.findOne({ + where: { id: campaignId }, + select: { id: true, status: true }, + }); + // A campaign deleted mid-batch behaves like a stop. + const status = row?.status ?? CampaignStatus.STOPPED; + this.cacheStatus(campaignId, status); + return status; + } + + private cacheStatus(campaignId: string, status: CampaignStatus): void { + this.statusCache.set(campaignId, { status, fetchedAt: Date.now() }); + } + + private statusCacheTtlMs(): number { + const parsed = parseInt( + process.env.CAMPAIGN_STATUS_CACHE_TTL_MS ?? + String(DEFAULT_STATUS_CACHE_TTL_MS), + 10, + ); + return Number.isFinite(parsed) && parsed > 0 + ? parsed + : DEFAULT_STATUS_CACHE_TTL_MS; + } + + private logAborted( + status: CampaignStatus, + meta: Record, + ): void { + this.logger.warn(`aborted: campaign ${this.statusLabel(status)}`, { + ...meta, + status: CampaignStatus[status], + }); + } + + private statusLabel(status: CampaignStatus): string { + switch (status) { + case CampaignStatus.PAUSED: + return 'paused'; + case CampaignStatus.STOPPED: + return 'stopped'; + default: + return `not dispatchable (${CampaignStatus[status]})`; + } + } + + /** Conditional on PENDING: the tabular idempotency lock (FR30, NFR16). */ + private async markSent(row: CampaignContact): Promise { + const updated = await this.campaignContactRepository.update( + { id: row.id, status: CampaignContactStatus.PENDING }, + { status: CampaignContactStatus.SENT, sentAt: new Date() }, + ); + return (updated.affected ?? 0) > 0; + } + + private async markSkipped(row: CampaignContact): Promise { + await this.campaignContactRepository.update( + { id: row.id, status: CampaignContactStatus.PENDING }, + { status: CampaignContactStatus.SKIPPED }, + ); + } + + /** + * CampaignContact has no failure-reason column; the reason is recorded in + * the structured log (correlated via correlationId). Retry/backoff is story + * 4.5 — here both 4xx and exhausted 5xx fail the contact immediately (FR33). + */ + private async markFailed( + row: CampaignContact, + reason: string, + statusCode?: number, + ): Promise { + await this.campaignContactRepository.update( + { id: row.id, status: CampaignContactStatus.PENDING }, + { status: CampaignContactStatus.FAILED }, + ); + this.logger.error('dispatch failed', { + campaignId: row.campaignId, + contactId: row.contactId, + statusCode, + reason, + }); + } + + private dispatchErrorCategory(statusCode?: number): string { + if (statusCode === undefined) return 'dispatch_network'; + if (statusCode >= 400 && statusCode < 500) return 'dispatch_4xx'; + if (statusCode >= 500) return 'dispatch_5xx'; + return 'dispatch_unexpected'; + } +} From 5beda6e13901d9170eb26b9f4e42f24be011345c Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Wed, 10 Jun 2026 11:24:59 -0300 Subject: [PATCH 4/7] feat(campaign-sender): module and campaigns.send consumer with metrics (EVO-1217) Consumer mirrors the packer pattern: contract validation up-front, correlationId wrapping and shared ack/nack policy (terminal -> DLQ). Emits per-message duration (p50/p95/p99), throughput, categorized errors and a 15s background consumer-lag poll via PipelineMetricsService (NFR33). Co-Authored-By: Claude Fable 5 --- .../campaign-sender/campaign-sender.module.ts | 25 +++ .../consumers/campaigns-send.consumer.spec.ts | 206 ++++++++++++++++++ .../consumers/campaigns-send.consumer.ts | 136 ++++++++++++ 3 files changed, 367 insertions(+) create mode 100644 src/runners/campaign-sender/campaign-sender.module.ts create mode 100644 src/runners/campaign-sender/consumers/campaigns-send.consumer.spec.ts create mode 100644 src/runners/campaign-sender/consumers/campaigns-send.consumer.ts diff --git a/src/runners/campaign-sender/campaign-sender.module.ts b/src/runners/campaign-sender/campaign-sender.module.ts new file mode 100644 index 0000000..dd929b4 --- /dev/null +++ b/src/runners/campaign-sender/campaign-sender.module.ts @@ -0,0 +1,25 @@ +import { Module } from '@nestjs/common'; +import { CampaignSenderService } from './services/campaign-sender.service'; +import { BatchDispatcherService } from './services/batch-dispatcher.service'; +import { CampaignsSendConsumer } from './consumers/campaigns-send.consumer'; + +/** + * Runner module for RUN_MODE=campaign-sender (story 4.3 / EVO-1217). + * + * Boots the `campaigns.send` consumer, the sender service and the batch + * dispatcher. IMESSAGE_BROKER (BrokerModule), CrmInboxDispatcher + * (MessagingChannelsModule), ContactsClientService (CrmClientModule), + * PipelineMetricsService (PipelineMetricsModule), CorrelationContext + * (CorrelationModule), TenantDbContext (TenantDbContextModule) and + * CustomLoggerService (CommonModule) are all @Global, so this module only + * declares its own consumer + services. Imported conditionally from + * AppModule.forRoot() when AppFactory.shouldStartCampaignSender() is true. + */ +@Module({ + providers: [ + CampaignSenderService, + BatchDispatcherService, + CampaignsSendConsumer, + ], +}) +export class CampaignSenderModule {} diff --git a/src/runners/campaign-sender/consumers/campaigns-send.consumer.spec.ts b/src/runners/campaign-sender/consumers/campaigns-send.consumer.spec.ts new file mode 100644 index 0000000..0eec061 --- /dev/null +++ b/src/runners/campaign-sender/consumers/campaigns-send.consumer.spec.ts @@ -0,0 +1,206 @@ +import { CampaignsSendConsumer } from './campaigns-send.consumer'; +import { CampaignNotFoundError } from '../errors/campaign-not-found.error'; +import { + CAMPAIGNS_SEND_TOPIC, + type CampaignsSendContract, +} from '../../../shared/broker/contracts/campaigns-send.contract'; +import type { BrokerMessage } from '../../../shared/broker/interfaces/message-broker.interface'; + +const validPayload: CampaignsSendContract = { + campaignId: 'camp-1', + page: 1, + totalPages: 2, + contactIds: ['c1', 'c2'], + templateId: 'tpl-1', + channelType: 'whatsapp', + correlationId: '11111111-1111-4111-8111-111111111111', +}; + +const buildMsg = (payload: unknown): BrokerMessage => ({ + id: 'm1', + payload: payload as CampaignsSendContract, + headers: {}, + raw: {}, +}); + +describe('CampaignsSendConsumer', () => { + let consumer: CampaignsSendConsumer; + let broker: { + subscribe: jest.Mock; + ack: jest.Mock; + nack: jest.Mock; + getTopicLag: jest.Mock; + }; + let send: jest.Mock; + let runWithCorrelationId: jest.Mock; + let logger: { log: jest.Mock; warn: jest.Mock; error: jest.Mock }; + let metrics: { + observeRequestDuration: jest.Mock; + incError: jest.Mock; + setConsumerLag: jest.Mock; + }; + + beforeEach(() => { + broker = { + subscribe: jest.fn(), + ack: jest.fn(), + nack: jest.fn(), + getTopicLag: jest.fn().mockResolvedValue(0), + }; + send = jest.fn(); + runWithCorrelationId = jest.fn((_id: string, fn: () => unknown) => fn()); + logger = { log: jest.fn(), warn: jest.fn(), error: jest.fn() }; + metrics = { + observeRequestDuration: jest.fn(), + incError: jest.fn(), + setConsumerLag: jest.fn(), + }; + consumer = new CampaignsSendConsumer( + broker as any, + { send } as any, + { runWithCorrelationId } as any, + logger as any, + metrics as any, + ); + }); + + afterEach(() => { + consumer.onModuleDestroy(); + jest.useRealTimers(); + }); + + async function getHandler() { + await consumer.onModuleInit(); + return broker.subscribe.mock.calls[0][1] as ( + m: BrokerMessage, + ) => Promise; + } + + it('AC5: subscribes to campaigns.send on module init', async () => { + await consumer.onModuleInit(); + expect(broker.subscribe).toHaveBeenCalledWith( + CAMPAIGNS_SEND_TOPIC, + expect.any(Function), + ); + }); + + it('acks after a successful send and observes the message duration', async () => { + send.mockResolvedValueOnce({ + dispatched: 2, + skipped: 0, + failed: 0, + aborted: false, + }); + const handler = await getHandler(); + + await handler(buildMsg(validPayload)); + + expect(send).toHaveBeenCalledWith(validPayload); + expect(broker.ack).toHaveBeenCalledTimes(1); + expect(broker.nack).not.toHaveBeenCalled(); + expect(metrics.observeRequestDuration).toHaveBeenCalledWith( + CAMPAIGNS_SEND_TOPIC, + expect.any(Number), + ); + }); + + it('AC3: acks a pause/stop abort (send returns normally)', async () => { + send.mockResolvedValueOnce({ + dispatched: 0, + skipped: 0, + failed: 0, + aborted: true, + }); + const handler = await getHandler(); + + await handler(buildMsg(validPayload)); + + expect(broker.ack).toHaveBeenCalledTimes(1); + expect(broker.nack).not.toHaveBeenCalled(); + }); + + it('wraps processing in the payload correlationId', async () => { + send.mockResolvedValueOnce({ + dispatched: 0, + skipped: 2, + failed: 0, + aborted: false, + }); + const handler = await getHandler(); + + await handler(buildMsg(validPayload)); + + expect(runWithCorrelationId).toHaveBeenCalledWith( + validPayload.correlationId, + expect.any(Function), + ); + }); + + it('nack(requeue=false) on a terminal error', async () => { + send.mockRejectedValueOnce(new CampaignNotFoundError('camp-1')); + const handler = await getHandler(); + + await handler(buildMsg(validPayload)); + + expect(broker.nack).toHaveBeenCalledWith(expect.anything(), false); + expect(broker.ack).not.toHaveBeenCalled(); + }); + + it('nack(requeue=true) on a transient error', async () => { + send.mockRejectedValueOnce(new Error('db unavailable')); + const handler = await getHandler(); + + await handler(buildMsg(validPayload)); + + expect(broker.nack).toHaveBeenCalledWith(expect.anything(), true); + }); + + it('nack(requeue=false) on a malformed payload, without calling send', async () => { + const handler = await getHandler(); + + await handler(buildMsg({ campaignId: 'x' })); + + expect(broker.nack).toHaveBeenCalledWith(expect.anything(), false); + expect(send).not.toHaveBeenCalled(); + expect(runWithCorrelationId).not.toHaveBeenCalled(); + expect(metrics.incError).toHaveBeenCalledWith('malformed_payload'); + }); + + it('AC6: polls the broker lag and publishes the consumer_lag gauge', async () => { + jest.useFakeTimers(); + broker.getTopicLag.mockResolvedValue(42); + + await consumer.onModuleInit(); + await jest.advanceTimersByTimeAsync(15_000); + + expect(broker.getTopicLag).toHaveBeenCalledWith(CAMPAIGNS_SEND_TOPIC); + expect(metrics.setConsumerLag).toHaveBeenCalledWith( + CAMPAIGNS_SEND_TOPIC, + 42, + ); + }); + + it('a failed lag poll only warns and never touches message processing', async () => { + jest.useFakeTimers(); + broker.getTopicLag.mockRejectedValue(new Error('admin down')); + + await consumer.onModuleInit(); + await jest.advanceTimersByTimeAsync(15_000); + + expect(metrics.setConsumerLag).not.toHaveBeenCalled(); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('consumer lag poll failed'), + 'CampaignsSendConsumer', + ); + }); + + it('stops the lag poll on module destroy', async () => { + jest.useFakeTimers(); + await consumer.onModuleInit(); + consumer.onModuleDestroy(); + + await jest.advanceTimersByTimeAsync(60_000); + + expect(broker.getTopicLag).not.toHaveBeenCalled(); + }); +}); diff --git a/src/runners/campaign-sender/consumers/campaigns-send.consumer.ts b/src/runners/campaign-sender/consumers/campaigns-send.consumer.ts new file mode 100644 index 0000000..b1bfde9 --- /dev/null +++ b/src/runners/campaign-sender/consumers/campaigns-send.consumer.ts @@ -0,0 +1,136 @@ +import { + Inject, + Injectable, + OnModuleDestroy, + OnModuleInit, +} from '@nestjs/common'; +import { + IMESSAGE_BROKER, + IMessageBroker, + BrokerMessage, +} from '../../../shared/broker/interfaces/message-broker.interface'; +import { + CAMPAIGNS_SEND_TOPIC, + isCampaignsSendContract, + type CampaignsSendContract, +} from '../../../shared/broker/contracts/campaigns-send.contract'; +import { CorrelationContext } from '../../../shared/correlation/correlation.context'; +import { CustomLoggerService } from '../../../common/services/custom-logger.service'; +import { PipelineMetricsService } from '../../../shared/metrics/pipeline-metrics.service'; +import { processWithAckPolicy } from '../../../shared/broker/consumer/process-with-ack-policy'; +import { CampaignSenderService } from '../services/campaign-sender.service'; + +const LOG_CONTEXT = 'CampaignsSendConsumer'; +const DEFAULT_LAG_POLL_INTERVAL_MS = 15_000; + +/** + * Broker consumer for `campaigns.send` (story 4.3 / EVO-1217). Subscribes on + * boot and routes each page batch to `CampaignSenderService`, wrapping + * processing in the payload's `correlationId` so every downstream log carries + * it. The consumer group is named per RUN_MODE by the broker adapter, so + * multiple campaign-sender replicas share the topic's partitions (FR6). + * + * Ack/nack is delegated to the shared `processWithAckPolicy`: success (and a + * pause/stop abort, which returns normally) → ack, `TerminalError` (campaign + * or template missing) → nack(requeue=false), any other error → + * nack(requeue=true). A structurally invalid payload is dropped up-front. + * + * Observability (NFR33, descoped here from EVO-1223): per-message duration + * feeds the p50/p95/p99 summary, and a background poll publishes the topic's + * consumer lag via `PipelineMetricsService.setConsumerLag`. + */ +@Injectable() +export class CampaignsSendConsumer implements OnModuleInit, OnModuleDestroy { + private lagTimer: NodeJS.Timeout | null = null; + + constructor( + @Inject(IMESSAGE_BROKER) private readonly broker: IMessageBroker, + private readonly sender: CampaignSenderService, + private readonly correlation: CorrelationContext, + private readonly logger: CustomLoggerService, + private readonly metrics: PipelineMetricsService, + ) {} + + async onModuleInit(): Promise { + await this.broker.subscribe( + CAMPAIGNS_SEND_TOPIC, + (msg) => this.handle(msg), + ); + this.logger.log(`Subscribed to ${CAMPAIGNS_SEND_TOPIC}`, LOG_CONTEXT); + + this.lagTimer = setInterval( + () => void this.pollConsumerLag(), + this.lagPollIntervalMs(), + ); + // Lag polling must never keep a draining process alive. + this.lagTimer.unref?.(); + } + + onModuleDestroy(): void { + if (this.lagTimer) { + clearInterval(this.lagTimer); + this.lagTimer = null; + } + } + + private async handle( + msg: BrokerMessage, + ): Promise { + if (!isCampaignsSendContract(msg.payload)) { + this.logger.warn( + `Invalid ${CAMPAIGNS_SEND_TOPIC} payload (messageId=${msg.id}) — nack(requeue=false)`, + LOG_CONTEXT, + ); + this.metrics.incError('malformed_payload'); + await this.broker.nack(msg, false); + return; + } + + const payload = msg.payload; + const startedAt = Date.now(); + + await this.correlation.runWithCorrelationId(payload.correlationId, () => + processWithAckPolicy( + msg, + this.broker, + { + logger: this.logger, + context: LOG_CONTEXT, + meta: { campaignId: payload.campaignId, page: payload.page }, + }, + async () => { + await this.sender.send(payload); + }, + ), + ); + + this.metrics.observeRequestDuration( + CAMPAIGNS_SEND_TOPIC, + (Date.now() - startedAt) / 1000, + ); + } + + private async pollConsumerLag(): Promise { + try { + const lag = await this.broker.getTopicLag(CAMPAIGNS_SEND_TOPIC); + this.metrics.setConsumerLag(CAMPAIGNS_SEND_TOPIC, lag); + } catch (err) { + // Best-effort: a failed poll must never disturb message processing. + this.logger.warn( + `consumer lag poll failed: ${(err as Error).message}`, + LOG_CONTEXT, + ); + } + } + + private lagPollIntervalMs(): number { + const parsed = parseInt( + process.env.CAMPAIGN_SENDER_LAG_POLL_MS ?? + String(DEFAULT_LAG_POLL_INTERVAL_MS), + 10, + ); + return Number.isFinite(parsed) && parsed > 0 + ? parsed + : DEFAULT_LAG_POLL_INTERVAL_MS; + } +} From cf4cd632562ef565c49f337ecb4dbadf240ac3ff Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Wed, 10 Jun 2026 11:24:59 -0300 Subject: [PATCH 5/7] feat(campaign-sender): wire RUN_MODE=campaign-sender replacing the 1.1 stub (EVO-1217) Co-Authored-By: Claude Fable 5 --- src/app.module.ts | 4 ++++ src/main.ts | 8 +++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/app.module.ts b/src/app.module.ts index 32d4143..7c86759 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -40,6 +40,7 @@ import { AudienceModule } from './shared/audience/audience.module'; import { MessagingChannelsModule } from './shared/messaging-channels/messaging-channels.module'; import { EventReceiverModule } from './runners/event-receiver/event-receiver.module'; import { CampaignPackerModule } from './runners/campaign-packer/campaign-packer.module'; +import { CampaignSenderModule } from './runners/campaign-sender/campaign-sender.module'; import { EventProcessModule } from './runners/event-process/event-process.module'; import { AppFactory } from './app-factory'; import { @@ -104,6 +105,9 @@ export class AppModule { if (AppFactory.shouldStartCampaignPacker()) { conditionalImports.push(CampaignPackerModule); } + if (AppFactory.shouldStartCampaignSender()) { + conditionalImports.push(CampaignSenderModule); + } if (AppFactory.shouldStartEventProcess()) { conditionalImports.push(EventProcessModule); } diff --git a/src/main.ts b/src/main.ts index ebb2b06..d69597b 100644 --- a/src/main.ts +++ b/src/main.ts @@ -2,7 +2,6 @@ import * as dotenv from 'dotenv'; dotenv.config(); -import { RunMode } from './modules/processing/enums/run-mode.enum'; import { parseRunMode } from './modules/processing/config/processing.config'; // Fail-fast validation BEFORE NestFactory.create (EVO-1194 AC3). @@ -13,10 +12,9 @@ parseRunMode(process.env.RUN_MODE); // Stub-mode short-circuit for RUN_MODEs whose dedicated modules have not landed yet. // EVO-1194 introduces the names so docker-compose / k8s manifests can already // reference them; each downstream story wires its module in and removes the -// matching entry from this Set. -const STUB_RUN_MODES = new Set([ - RunMode.CAMPAIGN_SENDER, // wired by downstream campaign-sender story (epic 4) -]); +// matching entry from this Set. Empty since campaign-sender (EVO-1217) landed — +// kept for the next pre-wired RUN_MODE. +const STUB_RUN_MODES = new Set([]); if (STUB_RUN_MODES.has(process.env.RUN_MODE ?? '')) { // Structured JSON to stderr so log collectors (Loki / Datadog) ingest the // stub-exit event with proper severity instead of treating it as untagged From 3edf4523d8e109a2b98b6c7b415c77adcdd542dd Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Wed, 10 Jun 2026 11:33:55 -0300 Subject: [PATCH 6/7] fix(broker): use assertQueue in RabbitMQ getTopicLag to avoid channel-closing 404 (EVO-1217) checkQueue on a missing queue raises a channel-level error that closes the adapter's single shared channel, so a best-effort metrics poll could take down publish/ack for the whole process. assertQueue with the consumer's exact declaration is idempotent and returns the same messageCount. Co-Authored-By: Claude Fable 5 --- src/shared/broker/adapters/rabbitmq-broker.adapter.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/shared/broker/adapters/rabbitmq-broker.adapter.ts b/src/shared/broker/adapters/rabbitmq-broker.adapter.ts index 50317b2..f5cadcc 100644 --- a/src/shared/broker/adapters/rabbitmq-broker.adapter.ts +++ b/src/shared/broker/adapters/rabbitmq-broker.adapter.ts @@ -231,7 +231,13 @@ export class RabbitMQBrokerAdapter async getTopicLag(topic: string): Promise { this.assertActive('getTopicLag'); const queueName = `${this.resolveRunMode(topic)}-${topic}`; - const { messageCount } = await this.channel!.checkQueue(queueName); + // Same declaration as attachConsumer, so this is idempotent. checkQueue + // would 404 on a missing queue and that is a channel-level error that + // CLOSES the adapter's single shared channel — a best-effort metrics poll + // must never take down publish/ack for the whole process. + const { messageCount } = await this.channel!.assertQueue(queueName, { + durable: true, + }); return messageCount; } From 558b5b3418f46fcc1bfd37a617b6efa31194a64b Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Wed, 10 Jun 2026 11:33:55 -0300 Subject: [PATCH 7/7] =?UTF-8?q?fix(campaign-sender):=20review=20findings?= =?UTF-8?q?=20=E2=80=94=20pending-only=20hydration,=20dedupe,=20cache=20bo?= =?UTF-8?q?und=20(EVO-1217)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - hydrate only PENDING contacts so a redelivered already-SENT page skips cheaply instead of re-fetching the whole batch from the CRM (NFR16) - dedupe payload contactIds: a repeated id would dispatch twice since the in-memory row stays PENDING after the first send - bound the campaign status cache (prune expired entries past 1000) - log 'campaign contact failed' instead of 'dispatch failed' on the pre-dispatch contact_not_found path; JSON-stringify object-valued custom attributes in template rendering Co-Authored-By: Claude Fable 5 --- .../services/batch-dispatcher.service.ts | 6 +++- .../services/campaign-sender.service.spec.ts | 16 ++++++++- .../services/campaign-sender.service.ts | 33 ++++++++++++++++--- 3 files changed, 48 insertions(+), 7 deletions(-) diff --git a/src/runners/campaign-sender/services/batch-dispatcher.service.ts b/src/runners/campaign-sender/services/batch-dispatcher.service.ts index c3ce295..22b368b 100644 --- a/src/runners/campaign-sender/services/batch-dispatcher.service.ts +++ b/src/runners/campaign-sender/services/batch-dispatcher.service.ts @@ -85,7 +85,11 @@ export class BatchDispatcherService { }; for (const [key, value] of Object.entries(contact.customAttributes ?? {})) { values[`contact.${key}`] = - value === null || value === undefined ? '' : String(value); + value === null || value === undefined + ? '' + : typeof value === 'object' + ? JSON.stringify(value) + : String(value); } let rendered = content; diff --git a/src/runners/campaign-sender/services/campaign-sender.service.spec.ts b/src/runners/campaign-sender/services/campaign-sender.service.spec.ts index 7d8cdd1..9bfd1e2 100644 --- a/src/runners/campaign-sender/services/campaign-sender.service.spec.ts +++ b/src/runners/campaign-sender/services/campaign-sender.service.spec.ts @@ -129,10 +129,24 @@ describe('CampaignSenderService', () => { 'skipped: already sent', expect.objectContaining({ contactId: 'c1' }), ); + // Only the still-PENDING contact is hydrated from the CRM (NFR16). + expect(findByIds).toHaveBeenCalledWith(['c2']); expect(result.skipped).toBe(1); expect(result.dispatched).toBe(1); }); + it('dispatches a duplicated contactId only once', async () => { + campaignFindOne.mockResolvedValue(campaign()); + contactFind.mockResolvedValue([row('c1')]); + findByIds.mockResolvedValue([dto('c1')]); + + const result = await service.send(payload(['c1', 'c1', 'c1'])); + + expect(dispatch).toHaveBeenCalledTimes(1); + expect(result.dispatched).toBe(1); + expect(result.skipped).toBe(0); + }); + it('AC3: aborts mid-batch when the campaign flips to Paused after the status cache expires', async () => { campaignFindOne .mockResolvedValueOnce(campaign(CampaignStatus.SENDING)) @@ -181,7 +195,7 @@ describe('CampaignSenderService', () => { { status: CampaignContactStatus.FAILED }, ); expect(logger.error).toHaveBeenCalledWith( - 'dispatch failed', + 'campaign contact failed', expect.objectContaining({ contactId: 'c1', statusCode: 422, diff --git a/src/runners/campaign-sender/services/campaign-sender.service.ts b/src/runners/campaign-sender/services/campaign-sender.service.ts index e7cbba1..3ab106a 100644 --- a/src/runners/campaign-sender/services/campaign-sender.service.ts +++ b/src/runners/campaign-sender/services/campaign-sender.service.ts @@ -29,6 +29,7 @@ export interface SendResult { } const DEFAULT_STATUS_CACHE_TTL_MS = 5_000; +const STATUS_CACHE_MAX_ENTRIES = 1_000; const DISPATCHABLE_STATUSES = new Set([ CampaignStatus.SENDING, @@ -111,13 +112,25 @@ export class CampaignSenderService { payload.templateId, ); + // Dedupe defensively: the packer never repeats an id within a page, but a + // duplicated id would dispatch twice (the in-memory row stays PENDING + // after the first send — the tabular lock only guards cross-process races). + const contactIds = Array.from(new Set(payload.contactIds)); + const rows = await this.campaignContactRepository.find({ - where: { campaignId, contactId: In(payload.contactIds) }, + where: { campaignId, contactId: In(contactIds) }, }); const rowByContactId = new Map(rows.map((row) => [row.contactId, row])); - const contacts = await this.hydrateContacts(payload.contactIds); - for (const contactId of payload.contactIds) { + // Hydrate only contacts that can actually dispatch — a redelivered page of + // already-SENT contacts must skip cheaply (NFR16), not re-fetch the whole + // batch from the CRM. + const pendingIds = contactIds.filter( + (id) => rowByContactId.get(id)?.status === CampaignContactStatus.PENDING, + ); + const contacts = await this.hydrateContacts(pendingIds); + + for (const contactId of contactIds) { const row = rowByContactId.get(contactId); if (!row) { result.skipped++; @@ -252,7 +265,17 @@ export class CampaignSenderService { } private cacheStatus(campaignId: string, status: CampaignStatus): void { - this.statusCache.set(campaignId, { status, fetchedAt: Date.now() }); + const now = Date.now(); + // The consumer is long-lived: prune expired entries so the cache cannot + // grow unbounded across the campaigns this instance ever touched. + if (this.statusCache.size >= STATUS_CACHE_MAX_ENTRIES) { + for (const [key, entry] of this.statusCache) { + if (now - entry.fetchedAt >= this.statusCacheTtlMs()) { + this.statusCache.delete(key); + } + } + } + this.statusCache.set(campaignId, { status, fetchedAt: now }); } private statusCacheTtlMs(): number { @@ -317,7 +340,7 @@ export class CampaignSenderService { { id: row.id, status: CampaignContactStatus.PENDING }, { status: CampaignContactStatus.FAILED }, ); - this.logger.error('dispatch failed', { + this.logger.error('campaign contact failed', { campaignId: row.campaignId, contactId: row.contactId, statusCode,