Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand",
"test:e2e": "jest --config ./test/jest-e2e.json",
"test:contract": "BROKER_CONTRACT=1 jest --runInBand broker.contract.spec",
"broker:provision-topics": "ts-node --transpile-only -r tsconfig-paths/register scripts/broker-provision-topics.ts",
"lint:tenant-db": "node scripts/check-tenant-db-context.mjs",
"migration:generate": "typeorm-ts-node-commonjs migration:generate -d src/database/ormconfig.ts",
"migration:create": "typeorm-ts-node-commonjs migration:create",
Expand Down
72 changes: 72 additions & 0 deletions scripts/broker-provision-topics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Deploy-time topic provisioning (EVO-1200 / story 1.7).
*
* Reads BROKER_TYPE, boots a minimal Nest context with the real BrokerModule,
* and idempotently provisions the 7 canonical pipeline topics through the
* active adapter (Kafka topics / RabbitMQ exchanges + default queues). Run on
* the first deploy of evo-flow to a fresh cluster, before any pipeline mode
* starts. Safe to run repeatedly.
*
* Usage:
* BROKER_TYPE=kafka npm run broker:provision-topics
* BROKER_TYPE=rabbitmq npm run broker:provision-topics
*/
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { NestFactory } from '@nestjs/core';
import { BrokerModule } from '../src/shared/broker/broker.module';
import {
IMessageBroker,
IMESSAGE_BROKER,
} from '../src/shared/broker/interfaces/message-broker.interface';
import { ALL_CONTRACT_TOPIC_NAMES } from '../src/shared/broker/contracts/broker-topics';
import { EVENTS_RECEIVED_TOPIC_PREFIX } from '../src/shared/broker/contracts/events-received.contract';

@Module({
imports: [ConfigModule.forRoot({ isGlobal: true }), BrokerModule],
})
class ProvisionModule {}

// The 6 concrete contract topics + the events.received template root (its
// per-platform instances are created dynamically by the event-receiver).
const TOPICS: readonly string[] = [
...ALL_CONTRACT_TOPIC_NAMES,
EVENTS_RECEIVED_TOPIC_PREFIX,
];

async function main(): Promise<void> {
const brokerType = process.env.BROKER_TYPE;
if (!brokerType) {
console.error('BROKER_TYPE is required (kafka | rabbitmq).');
process.exit(1);
}

const app = await NestFactory.createApplicationContext(ProvisionModule, {
logger: ['error', 'warn'],
});
const broker = app.get<IMessageBroker>(IMESSAGE_BROKER);

try {
for (const topic of TOPICS) {
await broker.provisionTopic(topic);
console.log(`provisioned ${topic} (${brokerType})`);
}
console.log(`Provisioned ${TOPICS.length} topics on ${brokerType}.`);
} finally {
// Bound the graceful shutdown: the broker client's reconnect timers can
// delay app.close() indefinitely, and this is a one-shot script.
await Promise.race([
app.close().catch(() => undefined),
new Promise((resolve) => setTimeout(resolve, 5000)),
]);
}
}

main()
// Force exit: the broker client keeps the event loop alive even after
// app.close(), so a long-running script would otherwise never return.
.then(() => process.exit(0))
.catch((err) => {
console.error('Failed to provision topics:', err);
process.exit(1);
});
25 changes: 25 additions & 0 deletions src/shared/broker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,28 @@ needs no brokers:
> (not *success*) on PRs that don't touch the filtered paths. If you make
> `contract` a required check org-wide, pair it with a `paths-filter` step or a
> always-runs shim so unrelated PRs aren't blocked.

## Deploy: provisioning topics (EVO-1200)

Adapters create topics lazily on first publish/subscribe, but production should
provision the broker topology explicitly. Run this **once on the first deploy
to a fresh cluster, before any pipeline mode starts** (idempotent — safe to
re-run):

```bash
BROKER_TYPE=kafka npm run broker:provision-topics
BROKER_TYPE=rabbitmq npm run broker:provision-topics
```

It boots a minimal Nest context with `BrokerModule` and calls
`IMessageBroker.provisionTopic` for the 7 canonical topics
(`ALL_CONTRACT_TOPIC_NAMES` + the `events.received` template root):

- **Kafka** — `admin.createTopics` (idempotent; `TOPIC_ALREADY_EXISTS` ignored).
- **RabbitMQ** — a durable `topic` exchange per name + a default durable queue
(declared, **not bound**). Consumers bind their own `${runMode}-${topic}`
queue on subscribe; binding the default queue here would make it accumulate a
copy of every message with no consumer to drain it.

Per-platform `events.received.<platform>` topics stay dynamic — the
event-receiver creates them at runtime.
19 changes: 19 additions & 0 deletions src/shared/broker/adapters/kafka-broker.adapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,25 @@ describe('KafkaBrokerAdapter', () => {
await close();
});

it('provisionTopic creates the topic via admin.createTopics', async () => {
const { adapter, close } = await buildAdapter({
BROKER_TYPE: 'kafka',
KAFKA_BROKERS: 'localhost:9092',
});
await (
adapter as unknown as { onModuleInit: () => Promise<void> }
).onModuleInit();

await adapter.provisionTopic('campaigns.pack');

expect(lastKafka().admin.createTopics).toHaveBeenCalledTimes(1);
const call = (
lastKafka().admin.createTopics.mock.calls[0] as unknown[]
)[0] as CreateTopicsCall;
expect(call.topics[0].topic).toBe('campaigns.pack');
await close();
});

it('treats "topic already exists" as success and caches it', async () => {
const { adapter, close } = await buildAdapter({
BROKER_TYPE: 'kafka',
Expand Down
5 changes: 5 additions & 0 deletions src/shared/broker/adapters/kafka-broker.adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ export class KafkaBrokerAdapter
});
}

async provisionTopic(topic: string): Promise<void> {
this.assertActive('provisionTopic');
await this.ensureTopicExists(topic);
}

async ack(msg: BrokerMessage): Promise<void> {
const handle = this.pendingAcks.get(msg);
if (!handle) {
Expand Down
27 changes: 27 additions & 0 deletions src/shared/broker/adapters/rabbitmq-broker.adapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,33 @@ describe('RabbitMQBrokerAdapter', () => {
await close();
});

it('provisionTopic declares the exchange + durable queue but does NOT bind it', async () => {
const { adapter, close } = await buildAdapter({
BROKER_TYPE: 'rabbitmq',
RABBITMQ_URL: 'amqp://admin:admin@rabbit:5672',
});
await (
adapter as unknown as { onModuleInit: () => Promise<void> }
).onModuleInit();

await adapter.provisionTopic('campaigns.pack');

const ch = lastConn().channel;
expect(ch.assertExchange).toHaveBeenCalledWith(
'campaigns.pack',
'topic',
{
durable: true,
},
);
expect(ch.assertQueue).toHaveBeenCalledWith('campaigns.pack', {
durable: true,
});
// Unbound on purpose: a bound default queue would accumulate copies.
expect(ch.bindQueue).not.toHaveBeenCalled();
await close();
});

it('throws when called before onModuleInit (dormant adapter)', async () => {
const { adapter, close } = await buildAdapter({
BROKER_TYPE: 'kafka',
Expand Down
10 changes: 10 additions & 0 deletions src/shared/broker/adapters/rabbitmq-broker.adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ export class RabbitMQBrokerAdapter
});
}

async provisionTopic(topic: string): Promise<void> {
this.assertActive('provisionTopic');
await this.ensureExchange(topic);
// Declare the durable queue but do NOT bind it. A bound, never-drained
// default queue would accumulate a copy of every message — the real
// consumer uses its own `${runMode}-${topic}` queue and binds it on
// subscribe. Provisioning only guarantees the exchange + queue exist.
await this.channel!.assertQueue(topic, { durable: true });
}

async ack(msg: BrokerMessage): Promise<void> {
const handle = this.pendingAcks.get(msg);
if (!handle) {
Expand Down
6 changes: 6 additions & 0 deletions src/shared/broker/interfaces/message-broker.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ export interface IMessageBroker {
): Promise<void>;
ack(msg: BrokerMessage): Promise<void>;
nack(msg: BrokerMessage, requeue?: boolean): Promise<void>;
/**
* Idempotently create a topic's broker-side topology (Kafka topic; RabbitMQ
* exchange + default durable queue + binding) ahead of any publish/subscribe,
* for explicit deploy-time provisioning (EVO-1200). Safe to call repeatedly.
*/
provisionTopic(topic: string): Promise<void>;
}

export const IMESSAGE_BROKER: unique symbol = Symbol('IMessageBroker');
Loading