From df2a1fefe9eb3263592fb344f174968711bd3eba Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Fri, 12 Jun 2026 11:54:48 -0300 Subject: [PATCH 1/2] feat(evo-flow): pause/stop fast-path via campaigns.control consumers (EVO-1222) Story 4.8. The pause/stop/resume REST endpoints already wrote the authoritative Campaign.status flag and the sender already rechecked it (4.3), but propagation waited out the 5s status-cache TTL. This adds the broker fast-path so the change is honored in <1s, keeping the Postgres flag as the source of truth. - CampaignsService.pause/resume/stop (and bulk) publish campaigns.control { campaignId, action, correlationId } after the status save. The publish is best-effort: a broker outage never fails the transition (the sender honors the flag at the next TTL recheck), so it cannot trip the controller's workflow compensation. - CampaignsControlConsumer on the sender drops the campaign's cached status so the next dispatch recheck re-reads Postgres immediately. - CampaignsControlConsumer on the packer aborts an in-flight pagination on pause/stop and clears the flag on resume (best-effort; the sender guard is authoritative). - Kafka adapter provisions campaigns.control single-partition (ordered pause/resume) with 24h retention. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../services/campaigns.service.spec.ts | 96 ++++++++++++++++++ .../campaigns/services/campaigns.service.ts | 63 +++++++++++- .../campaign-packer/campaign-packer.module.ts | 8 +- .../campaigns-control.consumer.spec.ts | 94 ++++++++++++++++++ .../consumers/campaigns-control.consumer.ts | 80 +++++++++++++++ .../services/campaign-packer.service.spec.ts | 31 ++++++ .../services/campaign-packer.service.ts | 51 +++++++--- .../campaign-sender/campaign-sender.module.ts | 2 + .../campaigns-control.consumer.spec.ts | 97 +++++++++++++++++++ .../consumers/campaigns-control.consumer.ts | 79 +++++++++++++++ .../services/campaign-sender.service.ts | 10 ++ .../broker/adapters/kafka-broker.adapter.ts | 19 +++- 12 files changed, 613 insertions(+), 17 deletions(-) create mode 100644 src/modules/campaigns/services/campaigns.service.spec.ts create mode 100644 src/runners/campaign-packer/consumers/campaigns-control.consumer.spec.ts create mode 100644 src/runners/campaign-packer/consumers/campaigns-control.consumer.ts create mode 100644 src/runners/campaign-sender/consumers/campaigns-control.consumer.spec.ts create mode 100644 src/runners/campaign-sender/consumers/campaigns-control.consumer.ts diff --git a/src/modules/campaigns/services/campaigns.service.spec.ts b/src/modules/campaigns/services/campaigns.service.spec.ts new file mode 100644 index 0000000..a40de88 --- /dev/null +++ b/src/modules/campaigns/services/campaigns.service.spec.ts @@ -0,0 +1,96 @@ +import { CampaignsService } from './campaigns.service'; +import { Campaign, CampaignStatus } from '../entities/campaign.entity'; +import { CAMPAIGNS_CONTROL_TOPIC } from '../../../shared/broker/contracts/campaigns-control.contract'; + +/** + * EVO-1222 [4.8]: the status-transition methods publish the fast-path + * `campaigns.control` event after writing the authoritative Postgres flag. + */ +describe('CampaignsService — campaigns.control publishing', () => { + let service: CampaignsService; + let repo: { findOne: jest.Mock; save: jest.Mock }; + let broker: { publish: jest.Mock }; + let correlation: { + getCorrelationId: jest.Mock; + resolveIncoming: jest.Mock; + }; + + const CORRELATION_ID = '11111111-1111-4111-8111-111111111111'; + + beforeEach(() => { + repo = { + findOne: jest.fn(), + save: jest.fn().mockImplementation((c: Campaign) => Promise.resolve(c)), + }; + broker = { publish: jest.fn() }; + correlation = { + getCorrelationId: jest.fn().mockReturnValue(CORRELATION_ID), + resolveIncoming: jest.fn().mockReturnValue(CORRELATION_ID), + }; + const db = { getRepository: jest.fn().mockReturnValue(repo) }; + service = new CampaignsService(db as any, broker as any, correlation as any); + }); + + const seed = (status: CampaignStatus) => + repo.findOne.mockResolvedValueOnce({ id: 'camp-1', status } as Campaign); + + it('AC1: pause publishes a pause control event after persisting PAUSED', async () => { + seed(CampaignStatus.SENDING); + + await service.pause('camp-1'); + + expect(repo.save).toHaveBeenCalledWith( + expect.objectContaining({ status: CampaignStatus.PAUSED }), + ); + expect(broker.publish).toHaveBeenCalledWith(CAMPAIGNS_CONTROL_TOPIC, { + campaignId: 'camp-1', + action: 'pause', + correlationId: CORRELATION_ID, + }); + }); + + it('AC3: resume publishes a resume control event', async () => { + seed(CampaignStatus.PAUSED); + + await service.resume('camp-1'); + + expect(broker.publish).toHaveBeenCalledWith(CAMPAIGNS_CONTROL_TOPIC, { + campaignId: 'camp-1', + action: 'resume', + correlationId: CORRELATION_ID, + }); + }); + + it('AC4: stop publishes a stop control event', async () => { + seed(CampaignStatus.SENDING); + + await service.stop('camp-1'); + + expect(broker.publish).toHaveBeenCalledWith(CAMPAIGNS_CONTROL_TOPIC, { + campaignId: 'camp-1', + action: 'stop', + correlationId: CORRELATION_ID, + }); + }); + + it('does not publish when the transition is rejected', async () => { + seed(CampaignStatus.DRAFT); // pause requires SENDING + + await expect(service.pause('camp-1')).rejects.toThrow(); + expect(broker.publish).not.toHaveBeenCalled(); + }); + + it('does not fail the transition when the fast-path publish throws (authoritative flag already persisted)', async () => { + seed(CampaignStatus.SENDING); + broker.publish.mockRejectedValueOnce(new Error('broker unavailable')); + const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {}); + + const result = await service.pause('camp-1'); + + expect(result).toEqual( + expect.objectContaining({ status: CampaignStatus.PAUSED }), + ); + expect(warnSpy).toHaveBeenCalled(); + warnSpy.mockRestore(); + }); +}); diff --git a/src/modules/campaigns/services/campaigns.service.ts b/src/modules/campaigns/services/campaigns.service.ts index 83c28a0..8621c7a 100644 --- a/src/modules/campaigns/services/campaigns.service.ts +++ b/src/modules/campaigns/services/campaigns.service.ts @@ -1,4 +1,5 @@ import { + Inject, Injectable, NotFoundException, BadRequestException, @@ -12,15 +13,61 @@ import { } from '../entities/campaign.entity'; import { CreateCampaignDto, UpdateCampaignDto, CampaignQueryDto } from '../dto'; import { TenantDbContext } from '../../../evo-extension-points'; +import { + IMESSAGE_BROKER, + IMessageBroker, +} from '../../../shared/broker/interfaces/message-broker.interface'; +import { + CAMPAIGNS_CONTROL_TOPIC, + type CampaignControlAction, +} from '../../../shared/broker/contracts/campaigns-control.contract'; +import { CorrelationContext } from '../../../shared/correlation/correlation.context'; @Injectable() export class CampaignsService { - constructor(private readonly db: TenantDbContext) {} + constructor( + private readonly db: TenantDbContext, + @Inject(IMESSAGE_BROKER) private readonly broker: IMessageBroker, + private readonly correlation: CorrelationContext, + ) {} private get campaignRepository(): Repository { return this.db.getRepository(Campaign); } + /** + * EVO-1222 [4.8]: publish the fast-path `campaigns.control` event after an + * authoritative status transition so packer/sender drop their cached status + * and honor the change in <1s (the Postgres flag remains the source of + * truth). Reuses the request correlation id, minting one if absent. + */ + private async publishControl( + campaignId: string, + action: CampaignControlAction, + ): Promise { + try { + await this.broker.publish(CAMPAIGNS_CONTROL_TOPIC, { + campaignId, + action, + correlationId: this.correlation.resolveIncoming( + this.correlation.getCorrelationId(), + ), + }); + } catch (err) { + // Fast-path only: the authoritative Postgres status was already persisted, + // so a broker outage must NOT fail the transition (nor trip the + // controller's workflow compensation). The sender honors the flag at its + // next recheck (≤5s TTL, within NFR5). Reported via console to match this + // service's existing error-reporting style. + console.warn( + `[campaigns.control] publish failed for campaign ${campaignId} ` + + `(${action}); relying on the authoritative status flag: ${ + (err as Error).message + }`, + ); + } + } + async create(createCampaignDto: CreateCampaignDto): Promise { // Check for duplicate name const existingCampaign = await this.campaignRepository.findOne({ @@ -188,7 +235,9 @@ export class CampaignsService { } campaign.status = CampaignStatus.PAUSED; - return this.campaignRepository.save(campaign); + const saved = await this.campaignRepository.save(campaign); + await this.publishControl(id, 'pause'); + return saved; } async resume(id: string): Promise { @@ -201,7 +250,9 @@ export class CampaignsService { } campaign.status = CampaignStatus.SENDING; - return this.campaignRepository.save(campaign); + const saved = await this.campaignRepository.save(campaign); + await this.publishControl(id, 'resume'); + return saved; } async stop(id: string): Promise { @@ -219,7 +270,9 @@ export class CampaignsService { } campaign.status = CampaignStatus.STOPPED; - return this.campaignRepository.save(campaign); + const saved = await this.campaignRepository.save(campaign); + await this.publishControl(id, 'stop'); + return saved; } async duplicate(id: string): Promise { @@ -267,12 +320,14 @@ export class CampaignsService { if (campaign.status === CampaignStatus.SENDING) { campaign.status = CampaignStatus.PAUSED; await this.campaignRepository.save(campaign); + await this.publishControl(campaign.id, 'pause'); affectedCount++; } } else if (action === 'resume') { if (campaign.status === CampaignStatus.PAUSED) { campaign.status = CampaignStatus.SENDING; await this.campaignRepository.save(campaign); + await this.publishControl(campaign.id, 'resume'); affectedCount++; } } else if (action === 'delete') { diff --git a/src/runners/campaign-packer/campaign-packer.module.ts b/src/runners/campaign-packer/campaign-packer.module.ts index f56f07c..2b60b1c 100644 --- a/src/runners/campaign-packer/campaign-packer.module.ts +++ b/src/runners/campaign-packer/campaign-packer.module.ts @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common'; import { CampaignPackerService } from './services/campaign-packer.service'; import { PaginationService } from './services/pagination.service'; import { CampaignsPackConsumer } from './consumers/campaigns-pack.consumer'; +import { CampaignsControlConsumer } from './consumers/campaigns-control.consumer'; /** * Runner module for RUN_MODE=campaign-packer (story 4.1 / EVO-1215). @@ -14,6 +15,11 @@ import { CampaignsPackConsumer } from './consumers/campaigns-pack.consumer'; * AppModule.forRoot() when AppFactory.shouldStartCampaignPacker() is true. */ @Module({ - providers: [CampaignPackerService, PaginationService, CampaignsPackConsumer], + providers: [ + CampaignPackerService, + PaginationService, + CampaignsPackConsumer, + CampaignsControlConsumer, + ], }) export class CampaignPackerModule {} diff --git a/src/runners/campaign-packer/consumers/campaigns-control.consumer.spec.ts b/src/runners/campaign-packer/consumers/campaigns-control.consumer.spec.ts new file mode 100644 index 0000000..1d4c788 --- /dev/null +++ b/src/runners/campaign-packer/consumers/campaigns-control.consumer.spec.ts @@ -0,0 +1,94 @@ +import { CampaignsControlConsumer } from './campaigns-control.consumer'; +import { + CAMPAIGNS_CONTROL_TOPIC, + type CampaignsControlContract, +} from '../../../shared/broker/contracts/campaigns-control.contract'; +import type { BrokerMessage } from '../../../shared/broker/interfaces/message-broker.interface'; + +const buildMsg = ( + payload: unknown, +): BrokerMessage => ({ + id: 'm1', + payload: payload as CampaignsControlContract, + headers: {}, + raw: {}, +}); + +const control = ( + action: CampaignsControlContract['action'], +): CampaignsControlContract => ({ + campaignId: 'camp-1', + action, + correlationId: '11111111-1111-4111-8111-111111111111', +}); + +describe('CampaignsControlConsumer (campaign-packer)', () => { + let consumer: CampaignsControlConsumer; + let broker: { subscribe: jest.Mock; ack: jest.Mock; nack: jest.Mock }; + let markPaginationAborted: jest.Mock; + let clearPaginationAborted: jest.Mock; + let runWithCorrelationId: jest.Mock; + let logger: { log: jest.Mock; warn: jest.Mock; error: jest.Mock }; + + beforeEach(() => { + broker = { subscribe: jest.fn(), ack: jest.fn(), nack: jest.fn() }; + markPaginationAborted = jest.fn(); + clearPaginationAborted = jest.fn(); + runWithCorrelationId = jest.fn((_id: string, fn: () => unknown) => fn()); + logger = { log: jest.fn(), warn: jest.fn(), error: jest.fn() }; + consumer = new CampaignsControlConsumer( + broker as any, + { markPaginationAborted, clearPaginationAborted } as any, + { runWithCorrelationId } as any, + logger as any, + ); + }); + + async function getHandler() { + await consumer.onModuleInit(); + return broker.subscribe.mock.calls[0][1] as ( + m: BrokerMessage, + ) => Promise; + } + + it('AC5: subscribes to campaigns.control on module init', async () => { + await consumer.onModuleInit(); + expect(broker.subscribe).toHaveBeenCalledWith( + CAMPAIGNS_CONTROL_TOPIC, + expect.any(Function), + ); + }); + + it.each(['pause', 'stop'] as const)( + 'AC1/AC4: marks pagination aborted on %s and acks', + async (action) => { + const handler = await getHandler(); + + await handler(buildMsg(control(action))); + + expect(markPaginationAborted).toHaveBeenCalledWith('camp-1'); + expect(clearPaginationAborted).not.toHaveBeenCalled(); + expect(broker.ack).toHaveBeenCalledTimes(1); + }, + ); + + it('AC3: clears the abort flag on resume', async () => { + const handler = await getHandler(); + + await handler(buildMsg(control('resume'))); + + expect(clearPaginationAborted).toHaveBeenCalledWith('camp-1'); + expect(markPaginationAborted).not.toHaveBeenCalled(); + expect(broker.ack).toHaveBeenCalledTimes(1); + }); + + it('nack(requeue=false) on a malformed payload', async () => { + const handler = await getHandler(); + + await handler(buildMsg({ campaignId: 'x', action: 'nope' })); + + expect(broker.nack).toHaveBeenCalledWith(expect.anything(), false); + expect(markPaginationAborted).not.toHaveBeenCalled(); + expect(clearPaginationAborted).not.toHaveBeenCalled(); + }); +}); diff --git a/src/runners/campaign-packer/consumers/campaigns-control.consumer.ts b/src/runners/campaign-packer/consumers/campaigns-control.consumer.ts new file mode 100644 index 0000000..dd1a5cd --- /dev/null +++ b/src/runners/campaign-packer/consumers/campaigns-control.consumer.ts @@ -0,0 +1,80 @@ +import { Inject, Injectable, OnModuleInit } from '@nestjs/common'; +import { + IMESSAGE_BROKER, + IMessageBroker, + BrokerMessage, +} from '../../../shared/broker/interfaces/message-broker.interface'; +import { + CAMPAIGNS_CONTROL_TOPIC, + isCampaignsControlContract, + type CampaignsControlContract, +} from '../../../shared/broker/contracts/campaigns-control.contract'; +import { CorrelationContext } from '../../../shared/correlation/correlation.context'; +import { CustomLoggerService } from '../../../common/services/custom-logger.service'; +import { processWithAckPolicy } from '../../../shared/broker/consumer/process-with-ack-policy'; +import { CampaignPackerService } from '../services/campaign-packer.service'; + +const LOG_CONTEXT = 'CampaignsControlConsumer'; + +/** + * Broker consumer for `campaigns.control` on the campaign-packer (story 4.8 / + * EVO-1222). A pause/stop flags the campaign so an in-flight pagination stops + * emitting further `campaigns.send` pages; a resume clears the flag. Pagination + * is fast, so this rarely catches an active pack — but a 1M-contact audience + * can be paused mid-split. The authoritative guard remains the sender's status + * recheck; this only avoids queueing work that would be aborted anyway. + * + * A structurally invalid payload is dropped up-front (no correlationId to bind). + */ +@Injectable() +export class CampaignsControlConsumer implements OnModuleInit { + constructor( + @Inject(IMESSAGE_BROKER) private readonly broker: IMessageBroker, + private readonly packer: CampaignPackerService, + private readonly correlation: CorrelationContext, + private readonly logger: CustomLoggerService, + ) {} + + async onModuleInit(): Promise { + await this.broker.subscribe( + CAMPAIGNS_CONTROL_TOPIC, + (msg) => this.handle(msg), + ); + this.logger.log(`Subscribed to ${CAMPAIGNS_CONTROL_TOPIC}`, LOG_CONTEXT); + } + + private async handle( + msg: BrokerMessage, + ): Promise { + if (!isCampaignsControlContract(msg.payload)) { + this.logger.warn( + `Invalid ${CAMPAIGNS_CONTROL_TOPIC} payload (messageId=${msg.id}) — nack(requeue=false)`, + LOG_CONTEXT, + ); + await this.broker.nack(msg, false); + return; + } + + const payload = msg.payload; + + await this.correlation.runWithCorrelationId(payload.correlationId, () => + processWithAckPolicy( + msg, + this.broker, + { + logger: this.logger, + context: LOG_CONTEXT, + meta: { campaignId: payload.campaignId, action: payload.action }, + }, + () => { + if (payload.action === 'resume') { + this.packer.clearPaginationAborted(payload.campaignId); + } else { + this.packer.markPaginationAborted(payload.campaignId); + } + return Promise.resolve(); + }, + ), + ); + } +} diff --git a/src/runners/campaign-packer/services/campaign-packer.service.spec.ts b/src/runners/campaign-packer/services/campaign-packer.service.spec.ts index 6124003..37db54e 100644 --- a/src/runners/campaign-packer/services/campaign-packer.service.spec.ts +++ b/src/runners/campaign-packer/services/campaign-packer.service.spec.ts @@ -120,6 +120,37 @@ describe('CampaignPackerService', () => { ); }); + it('EVO-1222 [4.8]: stops publishing pages once the campaign is flagged aborted mid-pagination', async () => { + findOne.mockResolvedValueOnce({ + id: 'camp-1', + channelType: 'Channel::Email', + templates: [{ messageTemplateId: 'tmpl-1', variant: 'A' }], + }); + computeAudience.mockResolvedValueOnce({ + totalContacts: 1500, + validContacts: 1500, + invalidContacts: 0, + strategy: 'segment', + }); + find.mockResolvedValueOnce(contactRows(1500)); // 2 pages @ batch 1000 + + // A pause/stop control message lands right after the first page is queued. + publish.mockImplementation((topic: string) => { + if (topic === CAMPAIGNS_SEND_TOPIC) { + service.markPaginationAborted('camp-1'); + } + return Promise.resolve(); + }); + + await service.pack(payload); + + expect(sendCalls(publish)).toHaveLength(1); + expect(warn).toHaveBeenCalledWith( + 'campaign.pagination_aborted', + expect.objectContaining({ campaignId: 'camp-1' }), + ); + }); + it('publishes campaigns.tracked completed and warns on empty audience (AC2)', async () => { findOne.mockResolvedValueOnce({ id: 'camp-1', diff --git a/src/runners/campaign-packer/services/campaign-packer.service.ts b/src/runners/campaign-packer/services/campaign-packer.service.ts index 3ae673f..63d68e6 100644 --- a/src/runners/campaign-packer/services/campaign-packer.service.ts +++ b/src/runners/campaign-packer/services/campaign-packer.service.ts @@ -54,6 +54,11 @@ const CHANNEL_TYPE_TO_SEND: Record = { */ @Injectable() export class CampaignPackerService { + // EVO-1222 [4.8]: campaigns whose in-flight pagination must stop emitting + // further `campaigns.send` pages. Set by the `campaigns.control` consumer on + // pause/stop and cleared on resume; also cleared when a pagination finishes. + private readonly abortedCampaigns = new Set(); + constructor( private readonly db: TenantDbContext, private readonly audience: AudienceComputationService, @@ -70,6 +75,14 @@ export class CampaignPackerService { return this.db.getRepository(CampaignContact); } + markPaginationAborted(campaignId: string): void { + this.abortedCampaigns.add(campaignId); + } + + clearPaginationAborted(campaignId: string): void { + this.abortedCampaigns.delete(campaignId); + } + async pack(payload: CampaignsPackContract): Promise { const { campaignId, correlationId } = payload; @@ -145,17 +158,33 @@ export class CampaignPackerService { contacts: contactIds.length, }); - for (const page of pages) { - const message: CampaignsSendContract = { - campaignId: campaign.id, - page: page.page, - totalPages: page.totalPages, - contactIds: page.contactIds as [string, ...string[]], - templateId, - channelType, - correlationId, - }; - await this.broker.publish(CAMPAIGNS_SEND_TOPIC, message); + try { + for (const page of pages) { + // Pause/stop arriving mid-pagination: stop emitting further pages. The + // sender's status recheck is the authoritative guard; this just avoids + // queueing send work that would be aborted anyway (rare — pagination is + // fast, but possible for very large audiences). + if (this.abortedCampaigns.has(campaign.id)) { + this.logger.warn('campaign.pagination_aborted', { + campaignId: campaign.id, + publishedPages: page.page - 1, + totalPages: pages.length, + }); + break; + } + const message: CampaignsSendContract = { + campaignId: campaign.id, + page: page.page, + totalPages: page.totalPages, + contactIds: page.contactIds as [string, ...string[]], + templateId, + channelType, + correlationId, + }; + await this.broker.publish(CAMPAIGNS_SEND_TOPIC, message); + } + } finally { + this.abortedCampaigns.delete(campaign.id); } } diff --git a/src/runners/campaign-sender/campaign-sender.module.ts b/src/runners/campaign-sender/campaign-sender.module.ts index 2a9e49a..ab5f871 100644 --- a/src/runners/campaign-sender/campaign-sender.module.ts +++ b/src/runners/campaign-sender/campaign-sender.module.ts @@ -3,6 +3,7 @@ import { CampaignSenderService } from './services/campaign-sender.service'; import { BatchDispatcherService } from './services/batch-dispatcher.service'; import { RateLimiterService } from './services/rate-limiter.service'; import { CampaignsSendConsumer } from './consumers/campaigns-send.consumer'; +import { CampaignsControlConsumer } from './consumers/campaigns-control.consumer'; /** * Runner module for RUN_MODE=campaign-sender (story 4.3 / EVO-1217). @@ -22,6 +23,7 @@ import { CampaignsSendConsumer } from './consumers/campaigns-send.consumer'; BatchDispatcherService, RateLimiterService, CampaignsSendConsumer, + CampaignsControlConsumer, ], }) export class CampaignSenderModule {} diff --git a/src/runners/campaign-sender/consumers/campaigns-control.consumer.spec.ts b/src/runners/campaign-sender/consumers/campaigns-control.consumer.spec.ts new file mode 100644 index 0000000..a43d022 --- /dev/null +++ b/src/runners/campaign-sender/consumers/campaigns-control.consumer.spec.ts @@ -0,0 +1,97 @@ +import { CampaignsControlConsumer } from './campaigns-control.consumer'; +import { + CAMPAIGNS_CONTROL_TOPIC, + type CampaignsControlContract, +} from '../../../shared/broker/contracts/campaigns-control.contract'; +import type { BrokerMessage } from '../../../shared/broker/interfaces/message-broker.interface'; + +const validPayload: CampaignsControlContract = { + campaignId: 'camp-1', + action: 'pause', + correlationId: '11111111-1111-4111-8111-111111111111', +}; + +const buildMsg = ( + payload: unknown, +): BrokerMessage => ({ + id: 'm1', + payload: payload as CampaignsControlContract, + headers: {}, + raw: {}, +}); + +describe('CampaignsControlConsumer (campaign-sender)', () => { + let consumer: CampaignsControlConsumer; + let broker: { subscribe: jest.Mock; ack: jest.Mock; nack: jest.Mock }; + let invalidateStatusCache: jest.Mock; + let runWithCorrelationId: jest.Mock; + let logger: { log: jest.Mock; warn: jest.Mock; error: jest.Mock }; + + beforeEach(() => { + broker = { subscribe: jest.fn(), ack: jest.fn(), nack: jest.fn() }; + invalidateStatusCache = jest.fn(); + runWithCorrelationId = jest.fn((_id: string, fn: () => unknown) => fn()); + logger = { log: jest.fn(), warn: jest.fn(), error: jest.fn() }; + consumer = new CampaignsControlConsumer( + broker as any, + { invalidateStatusCache } as any, + { runWithCorrelationId } as any, + logger as any, + ); + }); + + async function getHandler() { + await consumer.onModuleInit(); + return broker.subscribe.mock.calls[0][1] as ( + m: BrokerMessage, + ) => Promise; + } + + it('AC5: subscribes to campaigns.control on module init', async () => { + await consumer.onModuleInit(); + expect(broker.subscribe).toHaveBeenCalledWith( + CAMPAIGNS_CONTROL_TOPIC, + expect.any(Function), + ); + }); + + it('AC1: invalidates the cached status for the campaign and acks', async () => { + const handler = await getHandler(); + + await handler(buildMsg(validPayload)); + + expect(invalidateStatusCache).toHaveBeenCalledWith('camp-1'); + expect(broker.ack).toHaveBeenCalledTimes(1); + expect(broker.nack).not.toHaveBeenCalled(); + }); + + it('invalidates regardless of the action (resume also re-reads the flag)', async () => { + const handler = await getHandler(); + + await handler(buildMsg({ ...validPayload, action: 'resume' })); + + expect(invalidateStatusCache).toHaveBeenCalledWith('camp-1'); + expect(broker.ack).toHaveBeenCalledTimes(1); + }); + + it('wraps processing in the payload correlationId', async () => { + const handler = await getHandler(); + + await handler(buildMsg(validPayload)); + + expect(runWithCorrelationId).toHaveBeenCalledWith( + validPayload.correlationId, + expect.any(Function), + ); + }); + + it('nack(requeue=false) on a malformed payload, without invalidating', async () => { + const handler = await getHandler(); + + await handler(buildMsg({ campaignId: 'x' })); + + expect(broker.nack).toHaveBeenCalledWith(expect.anything(), false); + expect(invalidateStatusCache).not.toHaveBeenCalled(); + expect(runWithCorrelationId).not.toHaveBeenCalled(); + }); +}); diff --git a/src/runners/campaign-sender/consumers/campaigns-control.consumer.ts b/src/runners/campaign-sender/consumers/campaigns-control.consumer.ts new file mode 100644 index 0000000..2cd2c82 --- /dev/null +++ b/src/runners/campaign-sender/consumers/campaigns-control.consumer.ts @@ -0,0 +1,79 @@ +import { Inject, Injectable, OnModuleInit } from '@nestjs/common'; +import { + IMESSAGE_BROKER, + IMessageBroker, + BrokerMessage, +} from '../../../shared/broker/interfaces/message-broker.interface'; +import { + CAMPAIGNS_CONTROL_TOPIC, + isCampaignsControlContract, + type CampaignsControlContract, +} from '../../../shared/broker/contracts/campaigns-control.contract'; +import { CorrelationContext } from '../../../shared/correlation/correlation.context'; +import { CustomLoggerService } from '../../../common/services/custom-logger.service'; +import { processWithAckPolicy } from '../../../shared/broker/consumer/process-with-ack-policy'; +import { CampaignSenderService } from '../services/campaign-sender.service'; + +const LOG_CONTEXT = 'CampaignsControlConsumer'; + +/** + * Broker consumer for `campaigns.control` on the campaign-sender (story 4.8 / + * EVO-1222) — the fast-path half of the hybrid pause/stop design. On any + * control message it drops the campaign's cached status so the next dispatch + * recheck re-reads the authoritative Postgres flag immediately (<1s) instead + * of waiting out the 5s TTL. The Postgres flag stays the source of truth, so a + * lost control message still aborts at the next TTL refresh (FR21–FR24, NFR5). + * + * A structurally invalid payload is dropped up-front (no correlationId to + * bind). The action itself is not branched on: pause, stop and resume all just + * invalidate the cache — the recheck then reads whatever status the REST call + * already wrote. + */ +@Injectable() +export class CampaignsControlConsumer implements OnModuleInit { + constructor( + @Inject(IMESSAGE_BROKER) private readonly broker: IMessageBroker, + private readonly sender: CampaignSenderService, + private readonly correlation: CorrelationContext, + private readonly logger: CustomLoggerService, + ) {} + + async onModuleInit(): Promise { + await this.broker.subscribe( + CAMPAIGNS_CONTROL_TOPIC, + (msg) => this.handle(msg), + ); + this.logger.log(`Subscribed to ${CAMPAIGNS_CONTROL_TOPIC}`, LOG_CONTEXT); + } + + private async handle( + msg: BrokerMessage, + ): Promise { + if (!isCampaignsControlContract(msg.payload)) { + this.logger.warn( + `Invalid ${CAMPAIGNS_CONTROL_TOPIC} payload (messageId=${msg.id}) — nack(requeue=false)`, + LOG_CONTEXT, + ); + await this.broker.nack(msg, false); + return; + } + + const payload = msg.payload; + + await this.correlation.runWithCorrelationId(payload.correlationId, () => + processWithAckPolicy( + msg, + this.broker, + { + logger: this.logger, + context: LOG_CONTEXT, + meta: { campaignId: payload.campaignId, action: payload.action }, + }, + () => { + this.sender.invalidateStatusCache(payload.campaignId); + return Promise.resolve(); + }, + ), + ); + } +} diff --git a/src/runners/campaign-sender/services/campaign-sender.service.ts b/src/runners/campaign-sender/services/campaign-sender.service.ts index c704ad4..7523980 100644 --- a/src/runners/campaign-sender/services/campaign-sender.service.ts +++ b/src/runners/campaign-sender/services/campaign-sender.service.ts @@ -92,6 +92,16 @@ export class CampaignSenderService { return this.db.getRepository(CampaignContact); } + /** + * EVO-1222 [4.8]: drop the cached status for a campaign so the next dispatch + * recheck re-reads the authoritative Postgres flag immediately instead of + * waiting out the TTL. Invoked by the `campaigns.control` consumer on + * pause/stop/resume — the fast-path half of the hybrid design. + */ + invalidateStatusCache(campaignId: string): void { + this.statusCache.delete(campaignId); + } + async send(payload: CampaignsSendContract): Promise { const { campaignId, page, totalPages } = payload; const result: SendResult = { diff --git a/src/shared/broker/adapters/kafka-broker.adapter.ts b/src/shared/broker/adapters/kafka-broker.adapter.ts index bd26bc6..b196306 100644 --- a/src/shared/broker/adapters/kafka-broker.adapter.ts +++ b/src/shared/broker/adapters/kafka-broker.adapter.ts @@ -17,6 +17,7 @@ import { } from '../interfaces/message-broker.interface'; import { BrokerType } from '../types/broker-type.enum'; import { BrokerMetrics } from '../metrics/broker-metrics'; +import { CAMPAIGNS_CONTROL_TOPIC } from '../contracts/campaigns-control.contract'; import { DELIVERY_ATTEMPT_HEADER, DLQ_REASON_HEADER, @@ -30,6 +31,16 @@ import { const CLIENT_ID = 'evo-flow-broker'; const DEFAULT_NUM_PARTITIONS = 12; const DEFAULT_REPLICATION_FACTOR = 1; + +// EVO-1222 [4.8]: per-topic provisioning overrides. `campaigns.control` carries +// ordered pause/resume signals per campaign, so it is single-partition (global +// ordering) with short retention — it is a fast-path, not a history log. +const TOPIC_CONFIG_OVERRIDES: Record< + string, + { numPartitions?: number; retentionMs?: number } +> = { + [CAMPAIGNS_CONTROL_TOPIC]: { numPartitions: 1, retentionMs: 86_400_000 }, +}; const CONNECT_RETRY_BUDGET_MS = 30_000; const CONNECT_RETRY_MAX_BACKOFF_MS = 15_000; const TOPIC_ALREADY_EXISTS_PATTERN = /already exists/i; @@ -490,13 +501,19 @@ export class KafkaBrokerAdapter private async ensureTopicExists(topic: string): Promise { if (this.ensuredTopics.has(topic)) return; + const override = TOPIC_CONFIG_OVERRIDES[topic]; try { await this.admin!.createTopics({ topics: [ { topic, - numPartitions: DEFAULT_NUM_PARTITIONS, + numPartitions: override?.numPartitions ?? DEFAULT_NUM_PARTITIONS, replicationFactor: DEFAULT_REPLICATION_FACTOR, + ...(override?.retentionMs != null && { + configEntries: [ + { name: 'retention.ms', value: String(override.retentionMs) }, + ], + }), }, ], waitForLeaders: true, From 806926a6508b79750c55b77bf71c6d5d45e5d061 Mon Sep 17 00:00:00 2001 From: Nickolas Oliveira Date: Fri, 12 Jun 2026 12:38:06 -0300 Subject: [PATCH 2/2] =?UTF-8?q?fix(evo-flow):=20address=20H1=20=E2=80=94?= =?UTF-8?q?=20mint=20uuid4=20for=20campaigns.control=20correlationId=20(EV?= =?UTF-8?q?O-1222)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The control event carried `resolveIncoming(getCorrelationId())`, which preserves non-UUIDv4 request tokens (SAFE_CORRELATION_ID is looser than v4). The contract is `z.uuidv4()` strict, so both consumers nack(requeue=false) a message the system itself produced — the <1s fast-path silently degraded to the 5s TTL fallback whenever the inbound X-Correlation-Id was non-v4. Mint a fresh UUID v4 per control event instead, matching the campaigns.pack producer (correlation ids are producer-minted across the pipeline). Drop the now-unused CorrelationContext dependency. The service spec no longer mocks id resolution; it validates the published payload through isCampaignsControlContract (the exact check both consumers run) plus an explicit v4-format regression. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../services/campaigns.service.spec.ts | 71 +++++++++++-------- .../campaigns/services/campaigns.service.ts | 15 ++-- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/src/modules/campaigns/services/campaigns.service.spec.ts b/src/modules/campaigns/services/campaigns.service.spec.ts index a40de88..0b06538 100644 --- a/src/modules/campaigns/services/campaigns.service.spec.ts +++ b/src/modules/campaigns/services/campaigns.service.spec.ts @@ -1,6 +1,12 @@ import { CampaignsService } from './campaigns.service'; import { Campaign, CampaignStatus } from '../entities/campaign.entity'; -import { CAMPAIGNS_CONTROL_TOPIC } from '../../../shared/broker/contracts/campaigns-control.contract'; +import { + CAMPAIGNS_CONTROL_TOPIC, + isCampaignsControlContract, +} from '../../../shared/broker/contracts/campaigns-control.contract'; + +const UUID_V4 = + /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; /** * EVO-1222 [4.8]: the status-transition methods publish the fast-path @@ -10,12 +16,6 @@ describe('CampaignsService — campaigns.control publishing', () => { let service: CampaignsService; let repo: { findOne: jest.Mock; save: jest.Mock }; let broker: { publish: jest.Mock }; - let correlation: { - getCorrelationId: jest.Mock; - resolveIncoming: jest.Mock; - }; - - const CORRELATION_ID = '11111111-1111-4111-8111-111111111111'; beforeEach(() => { repo = { @@ -23,18 +23,17 @@ describe('CampaignsService — campaigns.control publishing', () => { save: jest.fn().mockImplementation((c: Campaign) => Promise.resolve(c)), }; broker = { publish: jest.fn() }; - correlation = { - getCorrelationId: jest.fn().mockReturnValue(CORRELATION_ID), - resolveIncoming: jest.fn().mockReturnValue(CORRELATION_ID), - }; const db = { getRepository: jest.fn().mockReturnValue(repo) }; - service = new CampaignsService(db as any, broker as any, correlation as any); + service = new CampaignsService(db as any, broker as any); }); const seed = (status: CampaignStatus) => repo.findOne.mockResolvedValueOnce({ id: 'camp-1', status } as Campaign); - it('AC1: pause publishes a pause control event after persisting PAUSED', async () => { + const lastControl = (): [string, unknown] => + broker.publish.mock.calls.at(-1) as [string, unknown]; + + it('AC1: pause publishes a contract-valid pause control event after persisting PAUSED', async () => { seed(CampaignStatus.SENDING); await service.pause('camp-1'); @@ -42,35 +41,47 @@ describe('CampaignsService — campaigns.control publishing', () => { expect(repo.save).toHaveBeenCalledWith( expect.objectContaining({ status: CampaignStatus.PAUSED }), ); - expect(broker.publish).toHaveBeenCalledWith(CAMPAIGNS_CONTROL_TOPIC, { - campaignId: 'camp-1', - action: 'pause', - correlationId: CORRELATION_ID, - }); + const [topic, payload] = lastControl(); + expect(topic).toBe(CAMPAIGNS_CONTROL_TOPIC); + expect(payload).toMatchObject({ campaignId: 'camp-1', action: 'pause' }); + expect(isCampaignsControlContract(payload)).toBe(true); }); - it('AC3: resume publishes a resume control event', async () => { + it('AC3: resume publishes a contract-valid resume control event', async () => { seed(CampaignStatus.PAUSED); await service.resume('camp-1'); - expect(broker.publish).toHaveBeenCalledWith(CAMPAIGNS_CONTROL_TOPIC, { - campaignId: 'camp-1', - action: 'resume', - correlationId: CORRELATION_ID, - }); + const [topic, payload] = lastControl(); + expect(topic).toBe(CAMPAIGNS_CONTROL_TOPIC); + expect(payload).toMatchObject({ campaignId: 'camp-1', action: 'resume' }); + expect(isCampaignsControlContract(payload)).toBe(true); }); - it('AC4: stop publishes a stop control event', async () => { + it('AC4: stop publishes a contract-valid stop control event', async () => { seed(CampaignStatus.SENDING); await service.stop('camp-1'); - expect(broker.publish).toHaveBeenCalledWith(CAMPAIGNS_CONTROL_TOPIC, { - campaignId: 'camp-1', - action: 'stop', - correlationId: CORRELATION_ID, - }); + const [topic, payload] = lastControl(); + expect(topic).toBe(CAMPAIGNS_CONTROL_TOPIC); + expect(payload).toMatchObject({ campaignId: 'camp-1', action: 'stop' }); + expect(isCampaignsControlContract(payload)).toBe(true); + }); + + // Regression for the review HIGH: the correlationId must be a freshly minted + // UUID v4 (which the z.uuidv4() contract — and therefore both consumers — + // accepts), NOT the request CLS id, which SAFE_CORRELATION_ID may preserve as + // a non-v4 token that both consumers would reject as malformed. + it('mints a fresh uuid v4 correlationId the consumers accept', async () => { + seed(CampaignStatus.SENDING); + + await service.pause('camp-1'); + + const [, payload] = lastControl(); + const { correlationId } = payload as { correlationId: string }; + expect(correlationId).toMatch(UUID_V4); + expect(isCampaignsControlContract(payload)).toBe(true); }); it('does not publish when the transition is rejected', async () => { diff --git a/src/modules/campaigns/services/campaigns.service.ts b/src/modules/campaigns/services/campaigns.service.ts index 8621c7a..e825c22 100644 --- a/src/modules/campaigns/services/campaigns.service.ts +++ b/src/modules/campaigns/services/campaigns.service.ts @@ -6,6 +6,7 @@ import { ConflictException, } from '@nestjs/common'; import { Repository, In } from 'typeorm'; +import { randomUUID } from 'crypto'; import { Campaign, CampaignStatus, @@ -21,14 +22,12 @@ import { CAMPAIGNS_CONTROL_TOPIC, type CampaignControlAction, } from '../../../shared/broker/contracts/campaigns-control.contract'; -import { CorrelationContext } from '../../../shared/correlation/correlation.context'; @Injectable() export class CampaignsService { constructor( private readonly db: TenantDbContext, @Inject(IMESSAGE_BROKER) private readonly broker: IMessageBroker, - private readonly correlation: CorrelationContext, ) {} private get campaignRepository(): Repository { @@ -39,7 +38,13 @@ export class CampaignsService { * EVO-1222 [4.8]: publish the fast-path `campaigns.control` event after an * authoritative status transition so packer/sender drop their cached status * and honor the change in <1s (the Postgres flag remains the source of - * truth). Reuses the request correlation id, minting one if absent. + * truth). + * + * correlationId is a freshly minted UUID v4 — the contract is `z.uuidv4()` + * and pipeline correlation ids are producer-minted (matches the + * `campaigns.pack` producer). Propagating the request CLS id would feed a + * possibly non-v4 token (`SAFE_CORRELATION_ID` is looser than v4) that both + * consumers would reject as a malformed payload. */ private async publishControl( campaignId: string, @@ -49,9 +54,7 @@ export class CampaignsService { await this.broker.publish(CAMPAIGNS_CONTROL_TOPIC, { campaignId, action, - correlationId: this.correlation.resolveIncoming( - this.correlation.getCorrelationId(), - ), + correlationId: randomUUID(), }); } catch (err) { // Fast-path only: the authoritative Postgres status was already persisted,