fix(broker): give repeat same-topic subscriptions distinct consumer groups (EVO-1737)#64
Conversation
…roups (EVO-1737) In single mode all runners load in one process, so a topic consumed by two runners (campaigns.control: packer + sender, EVO-1222) collided on one consumer group and crashed the boot with "already has a consumer registered for topic". Give each repeat subscription to the same topic its own consumer group (base, then -2, -3…) so both coexist and receive every message — the same broadcast the distributed deployment already gets from separate per-runner groups. The dispatch/ack hot path is untouched; the counter is cleared on shutdown. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Reviewer's GuideKafkaBrokerAdapter now supports multiple subscriptions to the same topic within a single process by assigning unique consumer group IDs per subscription, and tests have been updated to assert the new behavior. Sequence diagram for repeat topic subscription group assignmentsequenceDiagram
actor Module
participant KafkaBrokerAdapter
participant topicSubscriptions
participant consumers
Module->>KafkaBrokerAdapter: subscribe("events-topic", handler1)
KafkaBrokerAdapter->>KafkaBrokerAdapter: ensureTopicExists("events-topic")
KafkaBrokerAdapter->>topicSubscriptions: get("events-topic") = 0
KafkaBrokerAdapter->>KafkaBrokerAdapter: baseGroupId = resolveRunMode("events-topic") + "-events-topic"
KafkaBrokerAdapter->>KafkaBrokerAdapter: groupId = baseGroupId
KafkaBrokerAdapter->>consumers: has(groupId) = false
KafkaBrokerAdapter->>topicSubscriptions: set("events-topic", 1)
KafkaBrokerAdapter->>consumers: set(groupId, consumer1)
Module->>KafkaBrokerAdapter: subscribe("events-topic", handler2)
KafkaBrokerAdapter->>KafkaBrokerAdapter: ensureTopicExists("events-topic")
KafkaBrokerAdapter->>topicSubscriptions: get("events-topic") = 1
KafkaBrokerAdapter->>KafkaBrokerAdapter: baseGroupId = resolveRunMode("events-topic") + "-events-topic"
KafkaBrokerAdapter->>KafkaBrokerAdapter: groupId = baseGroupId + "-2"
KafkaBrokerAdapter->>consumers: has(groupId) = false
KafkaBrokerAdapter->>topicSubscriptions: set("events-topic", 2)
KafkaBrokerAdapter->>consumers: set(groupId, consumer2)
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've left some high level feedback:
- Consider including the topic (and perhaps run mode) in the duplicate-consumer error message instead of only the groupId, as debugging collisions will usually start from the topic name rather than the derived group string.
- The
topicSubscriptionscounter is only ever incremented and cleared on module destroy; if you later introduce unsubscribe-style behavior or partial subscription failures, it may be worth aligning the counter with actual active consumers to avoid unbounded suffix growth or mismatches.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider including the topic (and perhaps run mode) in the duplicate-consumer error message instead of only the groupId, as debugging collisions will usually start from the topic name rather than the derived group string.
- The `topicSubscriptions` counter is only ever incremented and cleared on module destroy; if you later introduce unsubscribe-style behavior or partial subscription failures, it may be worth aligning the counter with actual active consumers to avoid unbounded suffix growth or mismatches.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
dpaes
left a comment
There was a problem hiding this comment.
✅ Approved — EVO-1737
Reviewed against the 3 ACs (dev:single boots both packer+sender with no campaigns.control double-registration; distributed unchanged; pause/stop/resume honored in single mode).
Core correctness — the crux is right: the fix gives each repeat same-topic subscription its own consumer group (<base>, then <base>-2) — group-per-handler, not a second consumer in the same group. Distinct Kafka groups each receive all messages (broadcast), so both packer and sender control consumers get every pause/stop/resume in single mode. This matches the fan-out distributed mode already gets from separate per-runner groups.
Verified:
- First subscription untouched — base group keeps its original name (only repeats get the
-Nsuffix); no committed-offset reset on deploy. Asserted by the new + existing tests. - Distributed mode unchanged (AC#2) — the per-topic counter is per-adapter-instance in-process state; separate processes never collide and never get a suffix.
- Dispatch/ack hot path untouched — only the map key changed (
topic→groupId). - Test proves fan-out — the new test asserts two distinct groups and that both consumers independently receive a message (not merely that group names differ).
- Disclosed limitations (
getTopicLagbase-group-only;subscribePatterndup-prefix) confirmed non-biting today (onlycampaigns.controlhits this path).
Non-blocking follow-up: RabbitMQBrokerAdapter.subscribe still throws on a duplicate topic, so BROKER_TYPE=rabbitmq npm run dev:single would hit the same crash class. Default dev is Kafka and this card scopes the Kafka crash — worth a scope note / follow-up to mirror the fix on the RabbitMQ adapter.
Caveat: evo-flow CI runs only Sourcery — "23/23, typecheck clean, dev:single boots" is self-reported.
No blockers. Approving and merging (squash).
Summary
npm run dev:singlecrashed on boot withKafkaBrokerAdapter already has a consumer registered for topic "campaigns.control". In single mode every runner loads in one process, so theCampaignsControlConsumerregistered in bothcampaign-packer.module.tsandcampaign-sender.module.ts(EVO-1222 — correct by design for the distributed deployment) collided on one consumer group.Fix:
KafkaBrokerAdapter.subscribenow gives each repeat subscription to the same topic its own consumer group (<group>, then<group>-2, …) so both consumers coexist and each receives every message — the same broadcast the distributed deployment already gets from separate per-runner groups. ThedispatchMessage/ackhot path is untouched; the per-topic counter is cleared on shutdown.Changes
kafka-broker.adapter.ts: per-topic subscription counter → distinct group for repeat subscriptions; map keyed by group; counter cleared inonModuleDestroy.kafka-broker.adapter.spec.ts: replaced the old "rejects subscribing twice" test with one asserting two distinct groups and that both consumers receive independently.Validation
evo-flow: npm run typecheck→ clean (exit 0)evo-flow: npx jest src/shared/broker/adapters/kafka-broker.adapter.spec.ts→ 23/23 pass (incl. new EVO-1737 test)evo-flow: npm run dev:single→ reaches🎯 Service ready in SINGLE mode; bothcampaigns.controlconsumers join distinct groups (single-campaigns.control+single-campaigns.control-2); 0 double-registration crashes.Acceptance Criteria
dev:singleboots with both packer and sender active — no crash.Known limitations (out of scope — noted from code review)
getTopicLag(topic)measures only the base group${runMode}-${topic}; lag of a repeat-subscription group (…-2) is not surfaced. Only affectscampaigns.controltoday (low-volume control fast-path).subscribePatternstill rejects a duplicate prefix — the same single-mode collision class would apply if two runners ever subscribed the same wildcard prefix (no current case).-Nsuffix is positional (module load order), so group names are not guaranteed stable across restarts; acceptable because repeat-subscribed topics are broadcast control topics with an authoritative Postgres flag.Linked Issue
🤖 Generated with Claude Code
Summary by Sourcery
Allow multiple consumers in a single process to subscribe to the same Kafka topic by assigning distinct consumer groups to repeat subscriptions.
Bug Fixes:
Enhancements:
Tests: