feat(evo-flow): pause/stop fast-path via campaigns.control consumers (EVO-1222)#59
Conversation
…(EVO-1222)
Story 4.8. The pause/stop/resume REST endpoints already wrote the authoritative
Campaign.status flag and the sender already rechecked it (4.3), but propagation
waited out the 5s status-cache TTL. This adds the broker fast-path so the change
is honored in <1s, keeping the Postgres flag as the source of truth.
- CampaignsService.pause/resume/stop (and bulk) publish campaigns.control
{ campaignId, action, correlationId } after the status save. The publish is
best-effort: a broker outage never fails the transition (the sender honors the
flag at the next TTL recheck), so it cannot trip the controller's workflow
compensation.
- CampaignsControlConsumer on the sender drops the campaign's cached status so
the next dispatch recheck re-reads Postgres immediately.
- CampaignsControlConsumer on the packer aborts an in-flight pagination on
pause/stop and clears the flag on resume (best-effort; the sender guard is
authoritative).
- Kafka adapter provisions campaigns.control single-partition (ordered
pause/resume) with 24h retention.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Sorry @nickoliveira23, you have reached your weekly rate limit of 500000 diff characters.
Please try again later or upgrade to continue using Sourcery
dpaes
left a comment
There was a problem hiding this comment.
Reviewed — requesting changes 🔴
The five ACs are logically delivered and the best-effort publish (the HIGH self-fix) checks out — a broker outage can't fail the committed transition or trip the controller compensation. But there's one HIGH finding that silently defeats the story's headline deliverable (the <1s fast-path), independently verified against the code at the head ref.
HIGH — campaigns.control publisher emits a correlationId its own consumers reject.
publishControl(campaigns.service.ts:153-157) publishescorrelationId = correlation.resolveIncoming(correlation.getCorrelationId()).resolveIncomingpreserves any inbound value matchingSAFE_CORRELATION_ID = /^[A-Za-z0-9._:-]{1,128}$/verbatim — i.e. non-v4 tokens (abc-123, uuid v1/v7) pass through.RequestContextMiddlewareseeds the CLScorrelationIdfrom the inboundX-Correlation-Idvia the sameresolveIncoming, so an upstream non-UUID id propagates straight into the payload.- The contract validates
correlationId: z.uuidv4()(.strict). Both consumers (packer and sender)safeParseagainst it; a non-v4 id fails validation →nack(requeue=false)+ an "Invalid payload" warning — on a message the system itself produced.contracts.spec.ts:134-143explicitly asserts the guard rejects a non-UUID and a non-v4 UUID. - Net: whenever the request carries a non-UUID
X-Correlation-Id, the control message is dropped on both consumers and the <1s fast-path silently no-ops, degrading to the 5s TTL fallback. Not data loss (the authoritative Postgres flag still pauses within NFR5), but it's the exact value this story exists to deliver. The new specs mask it —campaigns.service.specmocksresolveIncomingto a hardcoded valid UUID and both consumer specs use a literal v4. - This is the same
SAFE_CORRELATION_IDvsz.uuidv4()drift already seen on EVO-1209 —campaigns.packavoids it by minting a freshuuid4()in the workflow; #59 is the first producer to feed the request CLS id into az.uuidv4contract.
Fix: either mint a fresh UUID for the control event's correlationId (as campaigns.pack does) and keep the request id in a separate field/log, or relax the contract to z.string().regex(SAFE_CORRELATION_ID) consistently across the sibling pack/send/tracked contracts. Add a regression spec that drives a non-UUID correlationId end-to-end (publish → guard) instead of a hardcoded UUID.
Non-blocking (verified, not introduced here): the pre-existing red campaigns.controller.spec.ts (line 98, 2-arg stop expectation) and the create()/findAll() eslint findings are not in this PR's diff — your "pre-existing" call holds. Separately, SENDING_TESTAB campaigns aren't pausable via the pre-existing state machine — worth a product call on whether A/B-test runs should support pause/stop.
Leaving the PR open for the correlationId fix.
…ionId (EVO-1222) The control event carried `resolveIncoming(getCorrelationId())`, which preserves non-UUIDv4 request tokens (SAFE_CORRELATION_ID is looser than v4). The contract is `z.uuidv4()` strict, so both consumers nack(requeue=false) a message the system itself produced — the <1s fast-path silently degraded to the 5s TTL fallback whenever the inbound X-Correlation-Id was non-v4. Mint a fresh UUID v4 per control event instead, matching the campaigns.pack producer (correlation ids are producer-minted across the pipeline). Drop the now-unused CorrelationContext dependency. The service spec no longer mocks id resolution; it validates the published payload through isCampaignsControlContract (the exact check both consumers run) plus an explicit v4-format regression. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
dpaes
left a comment
There was a problem hiding this comment.
Re-review — fix verified, approved ✅
806926a resolves the HIGH from the previous round. Verified against the head ref:
- Root-cause fix is correct.
publishControlnow setscorrelationId: randomUUID()(Nodecrypto.randomUUID()→ UUID v4), so the producer no longer feeds the request CLS id into thez.uuidv4()contract. Matches thecampaigns.packproducer (producer-minted ids across the pipeline), so a non-v4 upstreamX-Correlation-Idcan no longer be rejected by the packer/sender consumers. Contract and consumers are unchanged — correctly, they're the pipeline standard. - Clean removal. The now-unused
CorrelationContextdependency is gone from the import and constructor; no orphanthis.correlationreference remains and no other call site constructsCampaignsServicewith the old 3-arg signature. - Specs un-masked — real guard now. The service spec drops the correlation mock and asserts each published payload passes
isCampaignsControlContract(payload)(the exact validator both consumers run), plus an explicitmints a fresh uuid v4 correlationIdregression test. A revert toresolveIncomingwould now turn these red. - Best-effort publish, state machine, and tabular idempotency are unchanged from the prior round (already verified).
Non-blocking note: the control event no longer carries the originating request's correlation id (the strict contract has no field for it, and this mirrors campaigns.pack). If request→control tracing is wanted later, that's a transport-header/log concern, not this payload.
The previously-flagged pre-existing items (red campaigns.controller.spec.ts, create()/findAll() eslint, SENDING_TESTAB non-pausable) remain out of scope as agreed.
Summary
Story 4.8 — the broker fast-path half of the hybrid pause/stop design. The REST
pause/stop/resumeendpoints already wrote the authoritativeCampaign.statusand the sender already rechecked it (4.3), but propagation waited out the 5s status-cache TTL. This publishescampaigns.controlso the change is honored in <1s, with the Postgres flag remaining the source of truth.CampaignsService.pause/resume/stop(+ bulk) publishcampaigns.control { campaignId, action, correlationId }after the status save. Best-effort publish: a broker outage never fails the transition (the sender honors the flag at its next TTL recheck) — so it cannot trip the controller's workflow compensation.CampaignsControlConsumer(sender) drops the campaign's cached status → next dispatch recheck re-reads Postgres immediately.CampaignsControlConsumer(packer) aborts an in-flight pagination on pause/stop, clears on resume (best-effort; the sender guard is authoritative).campaigns.controlsingle-partition (ordered pause/resume) + 24h retention.Security
correlationIdflows from the request CLS (minted if absent).Test plan
evo-flow: npm run typecheck→ cleanevo-flow: npx jest campaigns-control.consumer campaigns.service.spec campaign-packer.service.spec campaign-sender.service.spec kafka-broker.adapter→ 68 passedevo-flow: npx eslint <changed sources>→ clean (no new findings)Changed Files
src/modules/campaigns/services/campaigns.service.ts(+spec) — publish on transition (best-effort)src/runners/campaign-sender/{consumers/campaigns-control.consumer.ts (+spec),services/campaign-sender.service.ts,campaign-sender.module.ts}— cache invalidationsrc/runners/campaign-packer/{consumers/campaigns-control.consumer.ts (+spec),services/campaign-packer.service.ts (+spec),campaign-packer.module.ts}— pagination abortsrc/shared/broker/adapters/kafka-broker.adapter.ts— per-topic partition/retention overrideAC re-check (5/5)
{campaignId, action, correlationId}validated by both consumers. ✓Self-review (code review pass)
Known limitations (by design — degrade gracefully to the authoritative flag)
campaigns.controlalready exists (auto-created at 12 partitions), the 1-partition override does not retroactively apply (and Kafka cannot reduce partitions). Ordering is non-critical here (the authoritative recheck guarantees correctness).campaigns.controller.spec.tshas one red test (stopexpects a 2-arg signature) andcampaigns.service.tshas 4 pre-existing eslint findings increate()/findAll(). The crm/evo-flow CI does not gate jest/eslint.Linked Issue
🤖 Generated with Claude Code