diff --git a/package.json b/package.json index 8729996..9af78e5 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand", "test:e2e": "jest --config ./test/jest-e2e.json", "test:contract": "BROKER_CONTRACT=1 jest --runInBand broker.contract.spec", + "broker:provision-topics": "ts-node --transpile-only -r tsconfig-paths/register scripts/broker-provision-topics.ts", "lint:tenant-db": "node scripts/check-tenant-db-context.mjs", "migration:generate": "typeorm-ts-node-commonjs migration:generate -d src/database/ormconfig.ts", "migration:create": "typeorm-ts-node-commonjs migration:create", diff --git a/scripts/broker-provision-topics.ts b/scripts/broker-provision-topics.ts new file mode 100644 index 0000000..fe32bce --- /dev/null +++ b/scripts/broker-provision-topics.ts @@ -0,0 +1,72 @@ +/** + * Deploy-time topic provisioning (EVO-1200 / story 1.7). + * + * Reads BROKER_TYPE, boots a minimal Nest context with the real BrokerModule, + * and idempotently provisions the 7 canonical pipeline topics through the + * active adapter (Kafka topics / RabbitMQ exchanges + default queues). Run on + * the first deploy of evo-flow to a fresh cluster, before any pipeline mode + * starts. Safe to run repeatedly. + * + * Usage: + * BROKER_TYPE=kafka npm run broker:provision-topics + * BROKER_TYPE=rabbitmq npm run broker:provision-topics + */ +import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { NestFactory } from '@nestjs/core'; +import { BrokerModule } from '../src/shared/broker/broker.module'; +import { + IMessageBroker, + IMESSAGE_BROKER, +} from '../src/shared/broker/interfaces/message-broker.interface'; +import { ALL_CONTRACT_TOPIC_NAMES } from '../src/shared/broker/contracts/broker-topics'; +import { EVENTS_RECEIVED_TOPIC_PREFIX } from '../src/shared/broker/contracts/events-received.contract'; + +@Module({ + imports: [ConfigModule.forRoot({ isGlobal: true }), BrokerModule], +}) +class ProvisionModule {} + +// The 6 concrete contract topics + the events.received template root (its +// per-platform instances are created dynamically by the event-receiver). +const TOPICS: readonly string[] = [ + ...ALL_CONTRACT_TOPIC_NAMES, + EVENTS_RECEIVED_TOPIC_PREFIX, +]; + +async function main(): Promise { + const brokerType = process.env.BROKER_TYPE; + if (!brokerType) { + console.error('BROKER_TYPE is required (kafka | rabbitmq).'); + process.exit(1); + } + + const app = await NestFactory.createApplicationContext(ProvisionModule, { + logger: ['error', 'warn'], + }); + const broker = app.get(IMESSAGE_BROKER); + + try { + for (const topic of TOPICS) { + await broker.provisionTopic(topic); + console.log(`provisioned ${topic} (${brokerType})`); + } + console.log(`Provisioned ${TOPICS.length} topics on ${brokerType}.`); + } finally { + // Bound the graceful shutdown: the broker client's reconnect timers can + // delay app.close() indefinitely, and this is a one-shot script. + await Promise.race([ + app.close().catch(() => undefined), + new Promise((resolve) => setTimeout(resolve, 5000)), + ]); + } +} + +main() + // Force exit: the broker client keeps the event loop alive even after + // app.close(), so a long-running script would otherwise never return. + .then(() => process.exit(0)) + .catch((err) => { + console.error('Failed to provision topics:', err); + process.exit(1); + }); diff --git a/src/shared/broker/README.md b/src/shared/broker/README.md index 315320c..d060cb0 100644 --- a/src/shared/broker/README.md +++ b/src/shared/broker/README.md @@ -113,3 +113,28 @@ needs no brokers: > (not *success*) on PRs that don't touch the filtered paths. If you make > `contract` a required check org-wide, pair it with a `paths-filter` step or a > always-runs shim so unrelated PRs aren't blocked. + +## Deploy: provisioning topics (EVO-1200) + +Adapters create topics lazily on first publish/subscribe, but production should +provision the broker topology explicitly. Run this **once on the first deploy +to a fresh cluster, before any pipeline mode starts** (idempotent — safe to +re-run): + +```bash +BROKER_TYPE=kafka npm run broker:provision-topics +BROKER_TYPE=rabbitmq npm run broker:provision-topics +``` + +It boots a minimal Nest context with `BrokerModule` and calls +`IMessageBroker.provisionTopic` for the 7 canonical topics +(`ALL_CONTRACT_TOPIC_NAMES` + the `events.received` template root): + +- **Kafka** — `admin.createTopics` (idempotent; `TOPIC_ALREADY_EXISTS` ignored). +- **RabbitMQ** — a durable `topic` exchange per name + a default durable queue + (declared, **not bound**). Consumers bind their own `${runMode}-${topic}` + queue on subscribe; binding the default queue here would make it accumulate a + copy of every message with no consumer to drain it. + +Per-platform `events.received.` topics stay dynamic — the +event-receiver creates them at runtime. diff --git a/src/shared/broker/adapters/kafka-broker.adapter.spec.ts b/src/shared/broker/adapters/kafka-broker.adapter.spec.ts index 282663b..cb15454 100644 --- a/src/shared/broker/adapters/kafka-broker.adapter.spec.ts +++ b/src/shared/broker/adapters/kafka-broker.adapter.spec.ts @@ -331,6 +331,25 @@ describe('KafkaBrokerAdapter', () => { await close(); }); + it('provisionTopic creates the topic via admin.createTopics', async () => { + const { adapter, close } = await buildAdapter({ + BROKER_TYPE: 'kafka', + KAFKA_BROKERS: 'localhost:9092', + }); + await ( + adapter as unknown as { onModuleInit: () => Promise } + ).onModuleInit(); + + await adapter.provisionTopic('campaigns.pack'); + + expect(lastKafka().admin.createTopics).toHaveBeenCalledTimes(1); + const call = ( + lastKafka().admin.createTopics.mock.calls[0] as unknown[] + )[0] as CreateTopicsCall; + expect(call.topics[0].topic).toBe('campaigns.pack'); + await close(); + }); + it('treats "topic already exists" as success and caches it', async () => { const { adapter, close } = await buildAdapter({ BROKER_TYPE: 'kafka', diff --git a/src/shared/broker/adapters/kafka-broker.adapter.ts b/src/shared/broker/adapters/kafka-broker.adapter.ts index d275008..b517b96 100644 --- a/src/shared/broker/adapters/kafka-broker.adapter.ts +++ b/src/shared/broker/adapters/kafka-broker.adapter.ts @@ -188,6 +188,11 @@ export class KafkaBrokerAdapter }); } + async provisionTopic(topic: string): Promise { + this.assertActive('provisionTopic'); + await this.ensureTopicExists(topic); + } + async ack(msg: BrokerMessage): Promise { const handle = this.pendingAcks.get(msg); if (!handle) { diff --git a/src/shared/broker/adapters/rabbitmq-broker.adapter.spec.ts b/src/shared/broker/adapters/rabbitmq-broker.adapter.spec.ts index 87eaaa1..c4c68e2 100644 --- a/src/shared/broker/adapters/rabbitmq-broker.adapter.spec.ts +++ b/src/shared/broker/adapters/rabbitmq-broker.adapter.spec.ts @@ -333,6 +333,33 @@ describe('RabbitMQBrokerAdapter', () => { await close(); }); + it('provisionTopic declares the exchange + durable queue but does NOT bind it', async () => { + const { adapter, close } = await buildAdapter({ + BROKER_TYPE: 'rabbitmq', + RABBITMQ_URL: 'amqp://admin:admin@rabbit:5672', + }); + await ( + adapter as unknown as { onModuleInit: () => Promise } + ).onModuleInit(); + + await adapter.provisionTopic('campaigns.pack'); + + const ch = lastConn().channel; + expect(ch.assertExchange).toHaveBeenCalledWith( + 'campaigns.pack', + 'topic', + { + durable: true, + }, + ); + expect(ch.assertQueue).toHaveBeenCalledWith('campaigns.pack', { + durable: true, + }); + // Unbound on purpose: a bound default queue would accumulate copies. + expect(ch.bindQueue).not.toHaveBeenCalled(); + await close(); + }); + it('throws when called before onModuleInit (dormant adapter)', async () => { const { adapter, close } = await buildAdapter({ BROKER_TYPE: 'kafka', diff --git a/src/shared/broker/adapters/rabbitmq-broker.adapter.ts b/src/shared/broker/adapters/rabbitmq-broker.adapter.ts index c56a41a..15851ef 100644 --- a/src/shared/broker/adapters/rabbitmq-broker.adapter.ts +++ b/src/shared/broker/adapters/rabbitmq-broker.adapter.ts @@ -187,6 +187,16 @@ export class RabbitMQBrokerAdapter }); } + async provisionTopic(topic: string): Promise { + this.assertActive('provisionTopic'); + await this.ensureExchange(topic); + // Declare the durable queue but do NOT bind it. A bound, never-drained + // default queue would accumulate a copy of every message — the real + // consumer uses its own `${runMode}-${topic}` queue and binds it on + // subscribe. Provisioning only guarantees the exchange + queue exist. + await this.channel!.assertQueue(topic, { durable: true }); + } + async ack(msg: BrokerMessage): Promise { const handle = this.pendingAcks.get(msg); if (!handle) { diff --git a/src/shared/broker/interfaces/message-broker.interface.ts b/src/shared/broker/interfaces/message-broker.interface.ts index cea3556..8695781 100644 --- a/src/shared/broker/interfaces/message-broker.interface.ts +++ b/src/shared/broker/interfaces/message-broker.interface.ts @@ -19,6 +19,12 @@ export interface IMessageBroker { ): Promise; ack(msg: BrokerMessage): Promise; nack(msg: BrokerMessage, requeue?: boolean): Promise; + /** + * Idempotently create a topic's broker-side topology (Kafka topic; RabbitMQ + * exchange + default durable queue + binding) ahead of any publish/subscribe, + * for explicit deploy-time provisioning (EVO-1200). Safe to call repeatedly. + */ + provisionTopic(topic: string): Promise; } export const IMESSAGE_BROKER: unique symbol = Symbol('IMessageBroker');