Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions src/shared/broker/adapters/kafka-broker.adapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,15 +598,38 @@ describe('KafkaBrokerAdapter', () => {
await close();
});

it('rejects subscribing twice to the same topic', async () => {
const { adapter, close } = await subscribeAndCapture({
it('gives a repeat subscription its own group and both consumers receive (EVO-1737)', async () => {
const { adapter, received, consumer, close } = await subscribeAndCapture({
BROKER_TYPE: 'kafka',
RUN_MODE: 'event-process',
});

const received2: unknown[] = [];
await expect(
adapter.subscribe('events-topic', () => Promise.resolve()),
).rejects.toThrow(/already has a consumer registered/);
adapter.subscribe('events-topic', (msg) => {
received2.push(msg);
return Promise.resolve();
}),
).resolves.toBeUndefined();

// Distinct groups → both coexist in one process (single-mode broadcast).
expect(lastKafka().consumerGroupIds).toEqual([
'event-process-events-topic',
'event-process-events-topic-2',
]);

// Each consumer delivers to its own handler — both receive independently.
const consumer2 = lastKafka().consumers[1];
const message = {
topic: 'events-topic',
partition: 0,
message: { offset: '1', value: Buffer.from('{"a":1}'), headers: {} },
};
await consumer.__triggerMessage!(message);
await consumer2.__triggerMessage!(message);

expect(received).toHaveLength(1);
expect(received2).toHaveLength(1);
await close();
});

Expand Down
34 changes: 25 additions & 9 deletions src/shared/broker/adapters/kafka-broker.adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ export class KafkaBrokerAdapter
private producer: Producer | null = null;
private admin: Admin | null = null;
private readonly consumers = new Map<string, Consumer>();
// Per-topic subscription count. Repeat subscriptions to the same topic each
// get a distinct consumer group so they coexist in one process (EVO-1737).
private readonly topicSubscriptions = new Map<string, number>();
private readonly pendingAcks = new WeakMap<BrokerMessage, AckHandle>();
private readonly ensuredTopics = new Set<string>();
private deliveryLimit = 3;
Expand Down Expand Up @@ -102,18 +105,19 @@ export class KafkaBrokerAdapter
async onModuleDestroy(): Promise<void> {
if (!this.active) return;

for (const [topic, consumer] of this.consumers.entries()) {
for (const [consumerKey, consumer] of this.consumers.entries()) {
try {
await consumer.disconnect();
} catch (err) {
this.writeStructured('warn', 'broker.shutdown.consumer_failed', {
broker: BROKER_LABEL,
topic,
consumerKey,
error: (err as Error).message,
});
}
}
this.consumers.clear();
this.topicSubscriptions.clear();

try {
await this.producer?.disconnect();
Expand Down Expand Up @@ -172,15 +176,27 @@ export class KafkaBrokerAdapter
): Promise<void> {
this.assertActive('subscribe');

if (this.consumers.has(topic)) {
await this.ensureTopicExists(topic);

// Single mode runs every runner in one process, so a topic consumed by two
// runners (e.g. campaigns.control: packer + sender, EVO-1222) would otherwise
// collide on one consumer group. Give each repeat subscription its own group
// so both receive every message — the same broadcast the distributed
// deployment already gets from separate per-runner groups. EVO-1737.
// The "-N" suffix is positional (module load order), so a group name is not
// guaranteed stable across restarts; acceptable because repeat-subscribed
// topics are broadcast control topics whose Postgres flag stays authoritative.
const baseGroupId = `${this.resolveRunMode(topic)}-${topic}`;
const priorSubs = this.topicSubscriptions.get(topic) ?? 0;
const groupId =
priorSubs === 0 ? baseGroupId : `${baseGroupId}-${priorSubs + 1}`;

if (this.consumers.has(groupId)) {
throw new Error(
`KafkaBrokerAdapter already has a consumer registered for topic "${topic}".`,
`KafkaBrokerAdapter already has a consumer registered for group "${groupId}".`,
);
}

await this.ensureTopicExists(topic);

const groupId = `${this.resolveRunMode(topic)}-${topic}`;
this.topicSubscriptions.set(topic, priorSubs + 1);
const consumer = this.kafka!.consumer({ groupId });

consumer.on(consumer.events.CRASH, (event) => {
Expand All @@ -202,7 +218,7 @@ export class KafkaBrokerAdapter
this.dispatchMessage(consumer, payload, handler),
});

this.consumers.set(topic, consumer);
this.consumers.set(groupId, consumer);
this.writeStructured('info', 'broker.subscribe', {
broker: BROKER_LABEL,
topic,
Expand Down
Loading