feat(broker): broker:provision-topics deploy script + provisionTopic (EVO-1200)#31
Merged
Conversation
Public, idempotent topic provisioning: Kafka via admin.createTopics (TOPIC_ALREADY_EXISTS ignored); RabbitMQ via assertExchange + a default durable queue bound with `<topic>.#` (so the events.received exchange also catches the dynamic per-platform routing keys). Touches src/shared/broker/** — guarded by the contract-suite gate. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Standalone script boots a minimal Nest context with BrokerModule and provisions the 7 canonical topics through the active adapter. Idempotent; bounded graceful shutdown then force-exit (the broker client keeps the loop alive otherwise). README documents when to run it in the deploy flow. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ion (EVO-1200)
Self-review MEDIUM: binding the provisioned default queue to its topic exchange
would make it accumulate a copy of every message (the real consumer uses its
own ${runMode}-${topic} queue), filling the broker disk in production. Declare
the durable queue but do not bind it — consumers bind on subscribe.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Sorry @nickoliveira23, you have reached your weekly rate limit of 500000 diff characters.
Please try again later or upgrade to continue using Sourcery
dpaes
approved these changes
Jun 9, 2026
dpaes
left a comment
There was a problem hiding this comment.
Review — Approved ✅
Read both adapters, the interface, the deploy script, the topic contracts, and the new specs.
ACs:
- AC1 (Kafka, 7 topics) — MET. Script provisions
[...ALL_CONTRACT_TOPIC_NAMES (6), 'events.received']= 7 viaadmin.createTopics. - AC2 (RabbitMQ, 7 exchanges + default durable queues) — MET. The crux safety call is correct:
ensureExchange+assertQueue(durable:true)with nobindQueue— the default queue is declared but intentionally unbound, so it can't accumulate undrained copies and fill disk. The onlybindQueuein the file isattachConsumerbinding the consumer's own${runMode}-${topic}queue on subscribe. The spec assertsbindQueueis NOT called. - AC3 (idempotent re-run) — MET by adapter logic (Kafka swallows only
/already exists/iand caches; RabbitMQassert*are AMQP no-ops +declaredExchangesguard). The Kafka catch is correctly scoped — every non-"already exists" error rethrows, so a real provisioning failure (auth/connect/invalid config) propagates toprocess.exit(1), no false-green. Connection lifecycle is bounded (Promise.race([app.close(), 5s])+ forced exit), so no hung deploy step.
Non-blocking notes (LOW/NIT):
IMessageBroker.provisionTopicJSDoc still saysRabbitMQ exchange + default durable queue + binding— that+ bindingcontradicts the deliberate no-bind design (impl + README + spec all confirm no binding). Worth dropping+ bindingfrom the JSDoc so a future maintainer doesn't "fix" the adapter and reintroduce the disk-fill bug.- AC3 re-run is correct but exercised by no test (the new specs are single-call topology only);
events.enriched/ bareevents.receivedprovision inert/orphan topology; Kafka matches "already exists" by message string, not error code.
CI caveat: evo-flow runs only Sourcery (no jest gate), so "specs pass" is author-self-reported. The specs are well-formed and would prove the topology if run — worth a local npm test.
Merging to develop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Explicit deploy-time topic provisioning (story 1.7) so production no longer relies on lazy runtime topic creation. Adds a public
provisionTopicto the broker abstraction and a one-shot script that provisions the 7 canonical pipeline topics through the active adapter. Closes Epic 1 (Foundation).IMessageBroker.provisionTopic(topic)— idempotent. Kafka:admin.createTopics(TOPIC_ALREADY_EXISTSignored). RabbitMQ: durabletopicexchange + a default durable queue, declared but not bound (consumers bind their own${runMode}-${topic}queue on subscribe; a bound default queue would accumulate a copy of every message).scripts/broker-provision-topics.ts+npm run broker:provision-topics— boots a minimal Nest context withBrokerModule, provisionsALL_CONTRACT_TOPIC_NAMES+ theevents.receivedtemplate root (7 topics), bounded graceful shutdown then exits.Security
Test plan
evo-flow: npm run typecheck/npm run lint— cleanevo-flow: npm test -- src/shared/broker/adapters— 48 unit specs pass (incl. provisionTopic for both adapters; RabbitMQ asserts the queue is declared unbound)evo-flow: BROKER_TYPE=kafka npm run broker:provision-topics— 1st + 2nd run both "Provisioned 7 topics", exit 0 (idempotent)evo-flow: BROKER_TYPE=rabbitmq npm run broker:provision-topics— same; verified 7 exchanges + 7 queues, no topic→queue bindingsChanged Files
src/shared/broker/interfaces/message-broker.interface.tssrc/shared/broker/adapters/{kafka,rabbitmq}-broker.adapter.ts(+ specs)scripts/broker-provision-topics.tspackage.json,src/shared/broker/README.mdQA Notes (self-review, fixed)
Notes
src/shared/broker/**→ the EVO-1199 contract-suite gate runs on this PR. Minor merge overlap with EVO-1208 feat(event-process): event-process consumer + wildcard subscribe for events.received.* (EVO-1208) #28 (different adapter methods).Linked Issue
🤖 Generated with Claude Code