diff --git a/src/shared/broker/adapters/kafka-broker.adapter.spec.ts b/src/shared/broker/adapters/kafka-broker.adapter.spec.ts index 48aca23..b819101 100644 --- a/src/shared/broker/adapters/kafka-broker.adapter.spec.ts +++ b/src/shared/broker/adapters/kafka-broker.adapter.spec.ts @@ -598,15 +598,38 @@ describe('KafkaBrokerAdapter', () => { await close(); }); - it('rejects subscribing twice to the same topic', async () => { - const { adapter, close } = await subscribeAndCapture({ + it('gives a repeat subscription its own group and both consumers receive (EVO-1737)', async () => { + const { adapter, received, consumer, close } = await subscribeAndCapture({ BROKER_TYPE: 'kafka', RUN_MODE: 'event-process', }); + const received2: unknown[] = []; await expect( - adapter.subscribe('events-topic', () => Promise.resolve()), - ).rejects.toThrow(/already has a consumer registered/); + adapter.subscribe('events-topic', (msg) => { + received2.push(msg); + return Promise.resolve(); + }), + ).resolves.toBeUndefined(); + + // Distinct groups → both coexist in one process (single-mode broadcast). + expect(lastKafka().consumerGroupIds).toEqual([ + 'event-process-events-topic', + 'event-process-events-topic-2', + ]); + + // Each consumer delivers to its own handler — both receive independently. + const consumer2 = lastKafka().consumers[1]; + const message = { + topic: 'events-topic', + partition: 0, + message: { offset: '1', value: Buffer.from('{"a":1}'), headers: {} }, + }; + await consumer.__triggerMessage!(message); + await consumer2.__triggerMessage!(message); + + expect(received).toHaveLength(1); + expect(received2).toHaveLength(1); await close(); }); diff --git a/src/shared/broker/adapters/kafka-broker.adapter.ts b/src/shared/broker/adapters/kafka-broker.adapter.ts index b196306..92634da 100644 --- a/src/shared/broker/adapters/kafka-broker.adapter.ts +++ b/src/shared/broker/adapters/kafka-broker.adapter.ts @@ -71,6 +71,9 @@ export class KafkaBrokerAdapter private producer: Producer | null = null; private admin: Admin | null = null; private readonly consumers = new Map(); + // Per-topic subscription count. Repeat subscriptions to the same topic each + // get a distinct consumer group so they coexist in one process (EVO-1737). + private readonly topicSubscriptions = new Map(); private readonly pendingAcks = new WeakMap(); private readonly ensuredTopics = new Set(); private deliveryLimit = 3; @@ -102,18 +105,19 @@ export class KafkaBrokerAdapter async onModuleDestroy(): Promise { if (!this.active) return; - for (const [topic, consumer] of this.consumers.entries()) { + for (const [consumerKey, consumer] of this.consumers.entries()) { try { await consumer.disconnect(); } catch (err) { this.writeStructured('warn', 'broker.shutdown.consumer_failed', { broker: BROKER_LABEL, - topic, + consumerKey, error: (err as Error).message, }); } } this.consumers.clear(); + this.topicSubscriptions.clear(); try { await this.producer?.disconnect(); @@ -172,15 +176,27 @@ export class KafkaBrokerAdapter ): Promise { this.assertActive('subscribe'); - if (this.consumers.has(topic)) { + await this.ensureTopicExists(topic); + + // Single mode runs every runner in one process, so a topic consumed by two + // runners (e.g. campaigns.control: packer + sender, EVO-1222) would otherwise + // collide on one consumer group. Give each repeat subscription its own group + // so both receive every message — the same broadcast the distributed + // deployment already gets from separate per-runner groups. EVO-1737. + // The "-N" suffix is positional (module load order), so a group name is not + // guaranteed stable across restarts; acceptable because repeat-subscribed + // topics are broadcast control topics whose Postgres flag stays authoritative. + const baseGroupId = `${this.resolveRunMode(topic)}-${topic}`; + const priorSubs = this.topicSubscriptions.get(topic) ?? 0; + const groupId = + priorSubs === 0 ? baseGroupId : `${baseGroupId}-${priorSubs + 1}`; + + if (this.consumers.has(groupId)) { throw new Error( - `KafkaBrokerAdapter already has a consumer registered for topic "${topic}".`, + `KafkaBrokerAdapter already has a consumer registered for group "${groupId}".`, ); } - - await this.ensureTopicExists(topic); - - const groupId = `${this.resolveRunMode(topic)}-${topic}`; + this.topicSubscriptions.set(topic, priorSubs + 1); const consumer = this.kafka!.consumer({ groupId }); consumer.on(consumer.events.CRASH, (event) => { @@ -202,7 +218,7 @@ export class KafkaBrokerAdapter this.dispatchMessage(consumer, payload, handler), }); - this.consumers.set(topic, consumer); + this.consumers.set(groupId, consumer); this.writeStructured('info', 'broker.subscribe', { broker: BROKER_LABEL, topic,