Skip to content

Commit cf623d8

Browse files
committed
Do not deduplicate topic creations and deletions
As of now creating and deleting multiple topics in a row will ignore most of the topics except the first one. This is due to deduplication that is done based on operation alone without considering the target topics. This change incorporates topic data into the deduplication to avoid the aforementioned issue. on-behalf-of: @SAP [email protected]
1 parent 12cb67e commit cf623d8

File tree

2 files changed

+58
-2
lines changed

2 files changed

+58
-2
lines changed

src/clients/admin/admin.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ export class Admin extends Base<AdminOptions> {
465465
}
466466

467467
this[kPerformDeduplicated](
468-
'createTopics',
468+
`createTopics-${options.topics.join(',')}`,
469469
deduplicateCallback => {
470470
this[kPerformWithRetry](
471471
'createTopics',
@@ -527,7 +527,7 @@ export class Admin extends Base<AdminOptions> {
527527

528528
#deleteTopics (options: DeleteTopicsOptions, callback: CallbackWithPromise<void>): void {
529529
this[kPerformDeduplicated](
530-
'deleteTopics',
530+
`deleteTopics-${options.topics.join(',')}`,
531531
deduplicateCallback => {
532532
this[kPerformWithRetry](
533533
'deleteTopics',

test/clients/admin/admin.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,25 @@ test('createTopics using assignments', async t => {
458458
await admin.deleteTopics({ topics: [topicName] })
459459
})
460460

461+
test('createTopics should not deduplicate creation of different topics', async t => {
462+
const admin = createAdmin(t)
463+
464+
const topicNames = [`test-topic-${randomUUID()}`, `test-topic-${randomUUID()}`]
465+
466+
await Promise.all(
467+
topicNames.map(topicName =>
468+
admin.createTopics({
469+
topics: [topicName]
470+
}))
471+
)
472+
473+
const topicMetadata = await admin.metadata({ topics: topicNames })
474+
strictEqual(topicMetadata.topics.has(topicNames[0]), true)
475+
strictEqual(topicMetadata.topics.has(topicNames[1]), true)
476+
477+
await admin.deleteTopics({ topics: topicNames })
478+
})
479+
461480
test('createTopics should validate options in strict mode', async t => {
462481
const admin = createAdmin(t, { strict: true })
463482

@@ -617,6 +636,43 @@ test('deleteTopics should delete a topic and support diagnostic channels', async
617636
await admin.deleteTopics({ topics: [topicName] })
618637
})
619638

639+
test('deleteTopics should not deduplicate deletion of different topics', async t => {
640+
const admin = createAdmin(t)
641+
642+
const topicNames = [`test-topic-${randomUUID()}`, `test-topic-${randomUUID()}`]
643+
644+
admin.createTopics({ topics: topicNames })
645+
646+
const topicMetadata = await admin.metadata({ topics: topicNames })
647+
strictEqual(topicMetadata.topics.has(topicNames[0]), true)
648+
strictEqual(topicMetadata.topics.has(topicNames[1]), true)
649+
650+
await Promise.all(
651+
topicNames.map(topicName =>
652+
admin.deleteTopics({
653+
topics: [topicName]
654+
}))
655+
)
656+
657+
// Deletion needs some time to propagate, retry a few times
658+
await retry(15, 500, async () => {
659+
try {
660+
await admin.metadata({ topics: [topicNames[0]] })
661+
throw Error('Topic still exists: ' + topicNames[0])
662+
} catch (error) {
663+
// ApiCode 3 = UnknownTopicOrPartition
664+
ok(error.findBy?.('apiCode', 3))
665+
}
666+
try {
667+
await admin.metadata({ topics: [topicNames[1]] })
668+
throw Error('Topic still exists: ' + topicNames[1])
669+
} catch (error) {
670+
// ApiCode 3 = UnknownTopicOrPartition
671+
ok(error.findBy?.('apiCode', 3))
672+
}
673+
})
674+
})
675+
620676
test('deleteTopics should validate options in strict mode', async t => {
621677
const admin = createAdmin(t, { strict: true })
622678

0 commit comments

Comments
 (0)