feat(campaign-sender): campaigns.send consumer with tabular idempotency and pause/stop recheck (EVO-1217)#47
Merged
Merged
Conversation
Kafka sums high-watermark minus committed offset across partitions for the mode's consumer group; RabbitMQ reads the ready count of the mode's queue. Feeds the consumer_lag gauge required by NFR33. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…RM inbox (EVO-1217) Loads the batch's MessageTemplate once (missing template is terminal), renders legacy-compatible placeholders per contact and delegates delivery to the shared CrmInboxDispatcher (story 2.2). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…use recheck (EVO-1217) Processes one campaigns.send page: skips non-PENDING contacts (FR30/NFR16), rechecks Campaign.status before every dispatch through a 5s TTL cache (FR21-FR24) and records SENT/FAILED via updates conditional on PENDING so replica races never double-mark. 4xx/5xx fail immediately - retry is 4.5. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…s (EVO-1217) Consumer mirrors the packer pattern: contract validation up-front, correlationId wrapping and shared ack/nack policy (terminal -> DLQ). Emits per-message duration (p50/p95/p99), throughput, categorized errors and a 15s background consumer-lag poll via PipelineMetricsService (NFR33). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…1 stub (EVO-1217) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…-closing 404 (EVO-1217) checkQueue on a missing queue raises a channel-level error that closes the adapter's single shared channel, so a best-effort metrics poll could take down publish/ack for the whole process. assertQueue with the consumer's exact declaration is idempotent and returns the same messageCount. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…e, cache bound (EVO-1217) - hydrate only PENDING contacts so a redelivered already-SENT page skips cheaply instead of re-fetching the whole batch from the CRM (NFR16) - dedupe payload contactIds: a repeated id would dispatch twice since the in-memory row stays PENDING after the first send - bound the campaign status cache (prune expired entries past 1000) - log 'campaign contact failed' instead of 'dispatch failed' on the pre-dispatch contact_not_found path; JSON-stringify object-valued custom attributes in template rendering Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
There was a problem hiding this comment.
Sorry @nickoliveira23, you have reached your weekly rate limit of 500000 diff characters.
Please try again later or upgrade to continue using Sourcery
dpaes
approved these changes
Jun 10, 2026
dpaes
left a comment
There was a problem hiding this comment.
✅ Approved (with nits) — EVO-1217
Reviewed adversarially; cloned the PR head to verify load-bearing context. Base develop, MERGEABLE/CLEAN.
AC verification
- AC1–AC5 ✓ — dispatch →
markSentconditional-on-PENDING(replica-race-safe status lock); pending-only hydration; pause/stop rechecked before every dispatch via a bounded 5s TTL cache; 4xx →FAILEDwith reason logged;RUN_MODE=campaign-senderwired (1.1 stub removed), consumer subscribes and stays alive. Producer (campaign-packer) publishes exactly theCampaignsSendContractthe consumer validates; every dependency exists with the assumed signature, incl. the new additiveIMessageBroker.getTopicLagon both Kafka and RabbitMQ adapters. - AC6 ✓ emitted /
⚠️ partial on theGET /metricsHTTP-exposure clause only — worker modes don't start an HTTP server; deferred to EVO-1226 (5.1), accepted with a dated callout. Same pattern as EVO-1223.
Non-blocking (DM'd to the dev):
- [Medium] dispatch-then-claim → cross-replica double-SEND under redelivery (at-least-once; consistent with the card's status-lock model, not a spec violation). Worth a note when retry/rate-limit (4.4/4.5) land.
- [Low]
getTopicLaghas nobroker.contract.speccoverage (the only CI gate that runs real brokers) — its Kafka offset math / RabbitMQassertQueuepath is unit-mocked only. Consumer-group naming verified to matchsubscribe(). - [Nit] AC5 card text names a non-existent script (
start:dev:campaign-sendervs the realdev:campaign-sender);processed_paramsarray-through-Recordcast (behavior-preserving).
Note CI does not run jest/tsc/lint — recommend a local npm test -- src/runners/campaign-sender + npm run typecheck before merge. Squash-merging to develop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
campaign-senderrunner (story 4.3): consumescampaigns.sendpages published by the packer (4.2), hydrates contacts from the CRM and dispatches each PENDING contact through the sharedCrmInboxDispatcher(2.2).CampaignContactStatus(2.3): non-PENDING contacts are skipped (skipped: already sent); SENT/FAILED updates are conditional onstatus='PENDING'so replica races never double-mark (FR30, NFR16).Campaign.statusrechecked before every dispatch through a per-instance 5s TTL cache; on Paused/Stopped the batch aborts, acks and logsaborted: campaign paused(FR21–FR24).RUN_MODE=campaign-senderwired inmain.ts/app.module.ts, replacing the 1.1 stub (FR5); consumer groupcampaign-sender-campaigns.sendsupports horizontal replicas (FR6).IMessageBroker.getTopicLag(Kafka: high-watermark minus committed offsets; RabbitMQ: queue ready count via idempotentassertQueue).Security
contactIdsprevents double-send on a malformed page.Test plan
npm run typecheck— cleannpm test -- src/runners/campaign-sender— 30 tests, all 6 ACs covered literally (batch SENT+sent_at, already-sent skip, mid-batch pause abort, 4xx FAILED, subscribe on boot, metrics/lag)npm test -- src/shared/broker src/runners/campaign-packer— regression greennpm run dev:campaign-senderjoins groupcampaign-sender-campaigns.sendwith all 12 partitions assigned and stays alive (stub previously exited 0)campaigns.controller.spec.tsstop/accountId) — unrelated, reproduces on clean checkoutChanged Files
src/runners/campaign-sender/— module,campaigns-send.consumer.ts,campaign-sender.service.ts,batch-dispatcher.service.ts, errors, specssrc/shared/broker/interfaces/message-broker.interface.ts+ Kafka/RabbitMQ adapters — additivegetTopicLagsrc/main.ts,src/app.module.ts— RUN_MODE wire-upDeferred ACs
Linked Issue