Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/modules/campaigns/services/campaigns.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { CampaignsService } from './campaigns.service';
import { Campaign, CampaignStatus } from '../entities/campaign.entity';
import {
CAMPAIGNS_CONTROL_TOPIC,
isCampaignsControlContract,
} from '../../../shared/broker/contracts/campaigns-control.contract';

const UUID_V4 =
/^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;

/**
* EVO-1222 [4.8]: the status-transition methods publish the fast-path
* `campaigns.control` event after writing the authoritative Postgres flag.
*/
describe('CampaignsService — campaigns.control publishing', () => {
let service: CampaignsService;
let repo: { findOne: jest.Mock; save: jest.Mock };
let broker: { publish: jest.Mock };

beforeEach(() => {
repo = {
findOne: jest.fn(),
save: jest.fn().mockImplementation((c: Campaign) => Promise.resolve(c)),
};
broker = { publish: jest.fn() };
const db = { getRepository: jest.fn().mockReturnValue(repo) };
service = new CampaignsService(db as any, broker as any);
});

const seed = (status: CampaignStatus) =>
repo.findOne.mockResolvedValueOnce({ id: 'camp-1', status } as Campaign);

const lastControl = (): [string, unknown] =>
broker.publish.mock.calls.at(-1) as [string, unknown];

it('AC1: pause publishes a contract-valid pause control event after persisting PAUSED', async () => {
seed(CampaignStatus.SENDING);

await service.pause('camp-1');

expect(repo.save).toHaveBeenCalledWith(
expect.objectContaining({ status: CampaignStatus.PAUSED }),
);
const [topic, payload] = lastControl();
expect(topic).toBe(CAMPAIGNS_CONTROL_TOPIC);
expect(payload).toMatchObject({ campaignId: 'camp-1', action: 'pause' });
expect(isCampaignsControlContract(payload)).toBe(true);
});

it('AC3: resume publishes a contract-valid resume control event', async () => {
seed(CampaignStatus.PAUSED);

await service.resume('camp-1');

const [topic, payload] = lastControl();
expect(topic).toBe(CAMPAIGNS_CONTROL_TOPIC);
expect(payload).toMatchObject({ campaignId: 'camp-1', action: 'resume' });
expect(isCampaignsControlContract(payload)).toBe(true);
});

it('AC4: stop publishes a contract-valid stop control event', async () => {
seed(CampaignStatus.SENDING);

await service.stop('camp-1');

const [topic, payload] = lastControl();
expect(topic).toBe(CAMPAIGNS_CONTROL_TOPIC);
expect(payload).toMatchObject({ campaignId: 'camp-1', action: 'stop' });
expect(isCampaignsControlContract(payload)).toBe(true);
});

// Regression for the review HIGH: the correlationId must be a freshly minted
// UUID v4 (which the z.uuidv4() contract — and therefore both consumers —
// accepts), NOT the request CLS id, which SAFE_CORRELATION_ID may preserve as
// a non-v4 token that both consumers would reject as malformed.
it('mints a fresh uuid v4 correlationId the consumers accept', async () => {
seed(CampaignStatus.SENDING);

await service.pause('camp-1');

const [, payload] = lastControl();
const { correlationId } = payload as { correlationId: string };
expect(correlationId).toMatch(UUID_V4);
expect(isCampaignsControlContract(payload)).toBe(true);
});

it('does not publish when the transition is rejected', async () => {
seed(CampaignStatus.DRAFT); // pause requires SENDING

await expect(service.pause('camp-1')).rejects.toThrow();
expect(broker.publish).not.toHaveBeenCalled();
});

it('does not fail the transition when the fast-path publish throws (authoritative flag already persisted)', async () => {
seed(CampaignStatus.SENDING);
broker.publish.mockRejectedValueOnce(new Error('broker unavailable'));
const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {});

const result = await service.pause('camp-1');

expect(result).toEqual(
expect.objectContaining({ status: CampaignStatus.PAUSED }),
);
expect(warnSpy).toHaveBeenCalled();
warnSpy.mockRestore();
});
});
66 changes: 62 additions & 4 deletions src/modules/campaigns/services/campaigns.service.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,76 @@
import {
Inject,
Injectable,
NotFoundException,
BadRequestException,
ConflictException,
} from '@nestjs/common';
import { Repository, In } from 'typeorm';
import { randomUUID } from 'crypto';
import {
Campaign,
CampaignStatus,
CampaignType,
} from '../entities/campaign.entity';
import { CreateCampaignDto, UpdateCampaignDto, CampaignQueryDto } from '../dto';
import { TenantDbContext } from '../../../evo-extension-points';
import {
IMESSAGE_BROKER,
IMessageBroker,
} from '../../../shared/broker/interfaces/message-broker.interface';
import {
CAMPAIGNS_CONTROL_TOPIC,
type CampaignControlAction,
} from '../../../shared/broker/contracts/campaigns-control.contract';

@Injectable()
export class CampaignsService {
constructor(private readonly db: TenantDbContext) {}
constructor(
private readonly db: TenantDbContext,
@Inject(IMESSAGE_BROKER) private readonly broker: IMessageBroker,
) {}

private get campaignRepository(): Repository<Campaign> {
return this.db.getRepository(Campaign);
}

/**
* EVO-1222 [4.8]: publish the fast-path `campaigns.control` event after an
* authoritative status transition so packer/sender drop their cached status
* and honor the change in <1s (the Postgres flag remains the source of
* truth).
*
* correlationId is a freshly minted UUID v4 — the contract is `z.uuidv4()`
* and pipeline correlation ids are producer-minted (matches the
* `campaigns.pack` producer). Propagating the request CLS id would feed a
* possibly non-v4 token (`SAFE_CORRELATION_ID` is looser than v4) that both
* consumers would reject as a malformed payload.
*/
private async publishControl(
campaignId: string,
action: CampaignControlAction,
): Promise<void> {
try {
await this.broker.publish(CAMPAIGNS_CONTROL_TOPIC, {
campaignId,
action,
correlationId: randomUUID(),
});
} catch (err) {
// Fast-path only: the authoritative Postgres status was already persisted,
// so a broker outage must NOT fail the transition (nor trip the
// controller's workflow compensation). The sender honors the flag at its
// next recheck (≤5s TTL, within NFR5). Reported via console to match this
// service's existing error-reporting style.
console.warn(
`[campaigns.control] publish failed for campaign ${campaignId} ` +
`(${action}); relying on the authoritative status flag: ${
(err as Error).message
}`,
);
}
}

async create(createCampaignDto: CreateCampaignDto): Promise<Campaign> {
// Check for duplicate name
const existingCampaign = await this.campaignRepository.findOne({
Expand Down Expand Up @@ -188,7 +238,9 @@ export class CampaignsService {
}

campaign.status = CampaignStatus.PAUSED;
return this.campaignRepository.save(campaign);
const saved = await this.campaignRepository.save(campaign);
await this.publishControl(id, 'pause');
return saved;
}

async resume(id: string): Promise<Campaign> {
Expand All @@ -201,7 +253,9 @@ export class CampaignsService {
}

campaign.status = CampaignStatus.SENDING;
return this.campaignRepository.save(campaign);
const saved = await this.campaignRepository.save(campaign);
await this.publishControl(id, 'resume');
return saved;
}

async stop(id: string): Promise<Campaign> {
Expand All @@ -219,7 +273,9 @@ export class CampaignsService {
}

campaign.status = CampaignStatus.STOPPED;
return this.campaignRepository.save(campaign);
const saved = await this.campaignRepository.save(campaign);
await this.publishControl(id, 'stop');
return saved;
}

async duplicate(id: string): Promise<Campaign> {
Expand Down Expand Up @@ -267,12 +323,14 @@ export class CampaignsService {
if (campaign.status === CampaignStatus.SENDING) {
campaign.status = CampaignStatus.PAUSED;
await this.campaignRepository.save(campaign);
await this.publishControl(campaign.id, 'pause');
affectedCount++;
}
} else if (action === 'resume') {
if (campaign.status === CampaignStatus.PAUSED) {
campaign.status = CampaignStatus.SENDING;
await this.campaignRepository.save(campaign);
await this.publishControl(campaign.id, 'resume');
affectedCount++;
}
} else if (action === 'delete') {
Expand Down
8 changes: 7 additions & 1 deletion src/runners/campaign-packer/campaign-packer.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
import { CampaignPackerService } from './services/campaign-packer.service';
import { PaginationService } from './services/pagination.service';
import { CampaignsPackConsumer } from './consumers/campaigns-pack.consumer';
import { CampaignsControlConsumer } from './consumers/campaigns-control.consumer';

/**
* Runner module for RUN_MODE=campaign-packer (story 4.1 / EVO-1215).
Expand All @@ -14,6 +15,11 @@ import { CampaignsPackConsumer } from './consumers/campaigns-pack.consumer';
* AppModule.forRoot() when AppFactory.shouldStartCampaignPacker() is true.
*/
@Module({
providers: [CampaignPackerService, PaginationService, CampaignsPackConsumer],
providers: [
CampaignPackerService,
PaginationService,
CampaignsPackConsumer,
CampaignsControlConsumer,
],
})
export class CampaignPackerModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { CampaignsControlConsumer } from './campaigns-control.consumer';
import {
CAMPAIGNS_CONTROL_TOPIC,
type CampaignsControlContract,
} from '../../../shared/broker/contracts/campaigns-control.contract';
import type { BrokerMessage } from '../../../shared/broker/interfaces/message-broker.interface';

const buildMsg = (
payload: unknown,
): BrokerMessage<CampaignsControlContract> => ({
id: 'm1',
payload: payload as CampaignsControlContract,
headers: {},
raw: {},
});

const control = (
action: CampaignsControlContract['action'],
): CampaignsControlContract => ({
campaignId: 'camp-1',
action,
correlationId: '11111111-1111-4111-8111-111111111111',
});

describe('CampaignsControlConsumer (campaign-packer)', () => {
let consumer: CampaignsControlConsumer;
let broker: { subscribe: jest.Mock; ack: jest.Mock; nack: jest.Mock };
let markPaginationAborted: jest.Mock;
let clearPaginationAborted: jest.Mock;
let runWithCorrelationId: jest.Mock;
let logger: { log: jest.Mock; warn: jest.Mock; error: jest.Mock };

beforeEach(() => {
broker = { subscribe: jest.fn(), ack: jest.fn(), nack: jest.fn() };
markPaginationAborted = jest.fn();
clearPaginationAborted = jest.fn();
runWithCorrelationId = jest.fn((_id: string, fn: () => unknown) => fn());
logger = { log: jest.fn(), warn: jest.fn(), error: jest.fn() };
consumer = new CampaignsControlConsumer(
broker as any,
{ markPaginationAborted, clearPaginationAborted } as any,
{ runWithCorrelationId } as any,
logger as any,
);
});

async function getHandler() {
await consumer.onModuleInit();
return broker.subscribe.mock.calls[0][1] as (
m: BrokerMessage<CampaignsControlContract>,
) => Promise<void>;
}

it('AC5: subscribes to campaigns.control on module init', async () => {
await consumer.onModuleInit();
expect(broker.subscribe).toHaveBeenCalledWith(
CAMPAIGNS_CONTROL_TOPIC,
expect.any(Function),
);
});

it.each(['pause', 'stop'] as const)(
'AC1/AC4: marks pagination aborted on %s and acks',
async (action) => {
const handler = await getHandler();

await handler(buildMsg(control(action)));

expect(markPaginationAborted).toHaveBeenCalledWith('camp-1');
expect(clearPaginationAborted).not.toHaveBeenCalled();
expect(broker.ack).toHaveBeenCalledTimes(1);
},
);

it('AC3: clears the abort flag on resume', async () => {
const handler = await getHandler();

await handler(buildMsg(control('resume')));

expect(clearPaginationAborted).toHaveBeenCalledWith('camp-1');
expect(markPaginationAborted).not.toHaveBeenCalled();
expect(broker.ack).toHaveBeenCalledTimes(1);
});

it('nack(requeue=false) on a malformed payload', async () => {
const handler = await getHandler();

await handler(buildMsg({ campaignId: 'x', action: 'nope' }));

expect(broker.nack).toHaveBeenCalledWith(expect.anything(), false);
expect(markPaginationAborted).not.toHaveBeenCalled();
expect(clearPaginationAborted).not.toHaveBeenCalled();
});
});
Loading
Loading