Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions src/modules/campaigns/services/campaigns.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { CampaignsService } from './campaigns.service';
import { Campaign, CampaignStatus } from '../entities/campaign.entity';
import { CAMPAIGNS_CONTROL_TOPIC } from '../../../shared/broker/contracts/campaigns-control.contract';

/**
* EVO-1222 [4.8]: the status-transition methods publish the fast-path
* `campaigns.control` event after writing the authoritative Postgres flag.
*/
describe('CampaignsService — campaigns.control publishing', () => {
let service: CampaignsService;
let repo: { findOne: jest.Mock; save: jest.Mock };
let broker: { publish: jest.Mock };
let correlation: {
getCorrelationId: jest.Mock;
resolveIncoming: jest.Mock;
};

const CORRELATION_ID = '11111111-1111-4111-8111-111111111111';

beforeEach(() => {
repo = {
findOne: jest.fn(),
save: jest.fn().mockImplementation((c: Campaign) => Promise.resolve(c)),
};
broker = { publish: jest.fn() };
correlation = {
getCorrelationId: jest.fn().mockReturnValue(CORRELATION_ID),
resolveIncoming: jest.fn().mockReturnValue(CORRELATION_ID),
};
const db = { getRepository: jest.fn().mockReturnValue(repo) };
service = new CampaignsService(db as any, broker as any, correlation as any);
});

const seed = (status: CampaignStatus) =>
repo.findOne.mockResolvedValueOnce({ id: 'camp-1', status } as Campaign);

it('AC1: pause publishes a pause control event after persisting PAUSED', async () => {
seed(CampaignStatus.SENDING);

await service.pause('camp-1');

expect(repo.save).toHaveBeenCalledWith(
expect.objectContaining({ status: CampaignStatus.PAUSED }),
);
expect(broker.publish).toHaveBeenCalledWith(CAMPAIGNS_CONTROL_TOPIC, {
campaignId: 'camp-1',
action: 'pause',
correlationId: CORRELATION_ID,
});
});

it('AC3: resume publishes a resume control event', async () => {
seed(CampaignStatus.PAUSED);

await service.resume('camp-1');

expect(broker.publish).toHaveBeenCalledWith(CAMPAIGNS_CONTROL_TOPIC, {
campaignId: 'camp-1',
action: 'resume',
correlationId: CORRELATION_ID,
});
});

it('AC4: stop publishes a stop control event', async () => {
seed(CampaignStatus.SENDING);

await service.stop('camp-1');

expect(broker.publish).toHaveBeenCalledWith(CAMPAIGNS_CONTROL_TOPIC, {
campaignId: 'camp-1',
action: 'stop',
correlationId: CORRELATION_ID,
});
});

it('does not publish when the transition is rejected', async () => {
seed(CampaignStatus.DRAFT); // pause requires SENDING

await expect(service.pause('camp-1')).rejects.toThrow();
expect(broker.publish).not.toHaveBeenCalled();
});

it('does not fail the transition when the fast-path publish throws (authoritative flag already persisted)', async () => {
seed(CampaignStatus.SENDING);
broker.publish.mockRejectedValueOnce(new Error('broker unavailable'));
const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {});

const result = await service.pause('camp-1');

expect(result).toEqual(
expect.objectContaining({ status: CampaignStatus.PAUSED }),
);
expect(warnSpy).toHaveBeenCalled();
warnSpy.mockRestore();
});
});
63 changes: 59 additions & 4 deletions src/modules/campaigns/services/campaigns.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
Inject,
Injectable,
NotFoundException,
BadRequestException,
Expand All @@ -12,15 +13,61 @@ import {
} from '../entities/campaign.entity';
import { CreateCampaignDto, UpdateCampaignDto, CampaignQueryDto } from '../dto';
import { TenantDbContext } from '../../../evo-extension-points';
import {
IMESSAGE_BROKER,
IMessageBroker,
} from '../../../shared/broker/interfaces/message-broker.interface';
import {
CAMPAIGNS_CONTROL_TOPIC,
type CampaignControlAction,
} from '../../../shared/broker/contracts/campaigns-control.contract';
import { CorrelationContext } from '../../../shared/correlation/correlation.context';

@Injectable()
export class CampaignsService {
constructor(private readonly db: TenantDbContext) {}
constructor(
private readonly db: TenantDbContext,
@Inject(IMESSAGE_BROKER) private readonly broker: IMessageBroker,
private readonly correlation: CorrelationContext,
) {}

private get campaignRepository(): Repository<Campaign> {
return this.db.getRepository(Campaign);
}

/**
* EVO-1222 [4.8]: publish the fast-path `campaigns.control` event after an
* authoritative status transition so packer/sender drop their cached status
* and honor the change in <1s (the Postgres flag remains the source of
* truth). Reuses the request correlation id, minting one if absent.
*/
private async publishControl(
campaignId: string,
action: CampaignControlAction,
): Promise<void> {
try {
await this.broker.publish(CAMPAIGNS_CONTROL_TOPIC, {
campaignId,
action,
correlationId: this.correlation.resolveIncoming(
this.correlation.getCorrelationId(),
),
});
} catch (err) {
// Fast-path only: the authoritative Postgres status was already persisted,
// so a broker outage must NOT fail the transition (nor trip the
// controller's workflow compensation). The sender honors the flag at its
// next recheck (≤5s TTL, within NFR5). Reported via console to match this
// service's existing error-reporting style.
console.warn(
`[campaigns.control] publish failed for campaign ${campaignId} ` +
`(${action}); relying on the authoritative status flag: ${
(err as Error).message
}`,
);
}
}

async create(createCampaignDto: CreateCampaignDto): Promise<Campaign> {
// Check for duplicate name
const existingCampaign = await this.campaignRepository.findOne({
Expand Down Expand Up @@ -188,7 +235,9 @@ export class CampaignsService {
}

campaign.status = CampaignStatus.PAUSED;
return this.campaignRepository.save(campaign);
const saved = await this.campaignRepository.save(campaign);
await this.publishControl(id, 'pause');
return saved;
}

async resume(id: string): Promise<Campaign> {
Expand All @@ -201,7 +250,9 @@ export class CampaignsService {
}

campaign.status = CampaignStatus.SENDING;
return this.campaignRepository.save(campaign);
const saved = await this.campaignRepository.save(campaign);
await this.publishControl(id, 'resume');
return saved;
}

async stop(id: string): Promise<Campaign> {
Expand All @@ -219,7 +270,9 @@ export class CampaignsService {
}

campaign.status = CampaignStatus.STOPPED;
return this.campaignRepository.save(campaign);
const saved = await this.campaignRepository.save(campaign);
await this.publishControl(id, 'stop');
return saved;
}

async duplicate(id: string): Promise<Campaign> {
Expand Down Expand Up @@ -267,12 +320,14 @@ export class CampaignsService {
if (campaign.status === CampaignStatus.SENDING) {
campaign.status = CampaignStatus.PAUSED;
await this.campaignRepository.save(campaign);
await this.publishControl(campaign.id, 'pause');
affectedCount++;
}
} else if (action === 'resume') {
if (campaign.status === CampaignStatus.PAUSED) {
campaign.status = CampaignStatus.SENDING;
await this.campaignRepository.save(campaign);
await this.publishControl(campaign.id, 'resume');
affectedCount++;
}
} else if (action === 'delete') {
Expand Down
8 changes: 7 additions & 1 deletion src/runners/campaign-packer/campaign-packer.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
import { CampaignPackerService } from './services/campaign-packer.service';
import { PaginationService } from './services/pagination.service';
import { CampaignsPackConsumer } from './consumers/campaigns-pack.consumer';
import { CampaignsControlConsumer } from './consumers/campaigns-control.consumer';

/**
* Runner module for RUN_MODE=campaign-packer (story 4.1 / EVO-1215).
Expand All @@ -14,6 +15,11 @@ import { CampaignsPackConsumer } from './consumers/campaigns-pack.consumer';
* AppModule.forRoot() when AppFactory.shouldStartCampaignPacker() is true.
*/
@Module({
providers: [CampaignPackerService, PaginationService, CampaignsPackConsumer],
providers: [
CampaignPackerService,
PaginationService,
CampaignsPackConsumer,
CampaignsControlConsumer,
],
})
export class CampaignPackerModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { CampaignsControlConsumer } from './campaigns-control.consumer';
import {
CAMPAIGNS_CONTROL_TOPIC,
type CampaignsControlContract,
} from '../../../shared/broker/contracts/campaigns-control.contract';
import type { BrokerMessage } from '../../../shared/broker/interfaces/message-broker.interface';

const buildMsg = (
payload: unknown,
): BrokerMessage<CampaignsControlContract> => ({
id: 'm1',
payload: payload as CampaignsControlContract,
headers: {},
raw: {},
});

const control = (
action: CampaignsControlContract['action'],
): CampaignsControlContract => ({
campaignId: 'camp-1',
action,
correlationId: '11111111-1111-4111-8111-111111111111',
});

describe('CampaignsControlConsumer (campaign-packer)', () => {
let consumer: CampaignsControlConsumer;
let broker: { subscribe: jest.Mock; ack: jest.Mock; nack: jest.Mock };
let markPaginationAborted: jest.Mock;
let clearPaginationAborted: jest.Mock;
let runWithCorrelationId: jest.Mock;
let logger: { log: jest.Mock; warn: jest.Mock; error: jest.Mock };

beforeEach(() => {
broker = { subscribe: jest.fn(), ack: jest.fn(), nack: jest.fn() };
markPaginationAborted = jest.fn();
clearPaginationAborted = jest.fn();
runWithCorrelationId = jest.fn((_id: string, fn: () => unknown) => fn());
logger = { log: jest.fn(), warn: jest.fn(), error: jest.fn() };
consumer = new CampaignsControlConsumer(
broker as any,
{ markPaginationAborted, clearPaginationAborted } as any,
{ runWithCorrelationId } as any,
logger as any,
);
});

async function getHandler() {
await consumer.onModuleInit();
return broker.subscribe.mock.calls[0][1] as (
m: BrokerMessage<CampaignsControlContract>,
) => Promise<void>;
}

it('AC5: subscribes to campaigns.control on module init', async () => {
await consumer.onModuleInit();
expect(broker.subscribe).toHaveBeenCalledWith(
CAMPAIGNS_CONTROL_TOPIC,
expect.any(Function),
);
});

it.each(['pause', 'stop'] as const)(
'AC1/AC4: marks pagination aborted on %s and acks',
async (action) => {
const handler = await getHandler();

await handler(buildMsg(control(action)));

expect(markPaginationAborted).toHaveBeenCalledWith('camp-1');
expect(clearPaginationAborted).not.toHaveBeenCalled();
expect(broker.ack).toHaveBeenCalledTimes(1);
},
);

it('AC3: clears the abort flag on resume', async () => {
const handler = await getHandler();

await handler(buildMsg(control('resume')));

expect(clearPaginationAborted).toHaveBeenCalledWith('camp-1');
expect(markPaginationAborted).not.toHaveBeenCalled();
expect(broker.ack).toHaveBeenCalledTimes(1);
});

it('nack(requeue=false) on a malformed payload', async () => {
const handler = await getHandler();

await handler(buildMsg({ campaignId: 'x', action: 'nope' }));

expect(broker.nack).toHaveBeenCalledWith(expect.anything(), false);
expect(markPaginationAborted).not.toHaveBeenCalled();
expect(clearPaginationAborted).not.toHaveBeenCalled();
});
});
Loading
Loading