Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ private[server] class ExpiringErrorCache(maxSize: Int, time: Time) {
}
}

def hasError(topicName: String, currentTimeMs: Long): Boolean = {
val entry = byTopic.get(topicName)
entry != null && entry.expirationTimeMs > currentTimeMs
}

def getErrorsForTopics(topicNames: Set[String], currentTimeMs: Long): Map[String, String] = {
val result = mutable.Map.empty[String, String]
topicNames.foreach { topicName =>
Expand Down Expand Up @@ -173,8 +178,22 @@ class DefaultAutoTopicCreationManager(
requestContext: RequestContext,
timeoutMs: Long
): Unit = {
if (topics.nonEmpty) {
sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext), timeoutMs)
if (topics.isEmpty) {
return
}

val currentTimeMs = time.milliseconds()

// Filter out topics that are:
// 1. Already in error cache (back-off period)
// 2. Already in-flight (concurrent request)
val topicsToCreate = topics.filter { case (topicName, _) =>
!topicCreationErrorCache.hasError(topicName, currentTimeMs) &&
inflightTopics.add(topicName)
}

if (topicsToCreate.nonEmpty) {
sendCreateTopicRequestWithErrorCaching(topicsToCreate, Some(requestContext), timeoutMs)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import org.mockito.Mockito.never

import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._

class AutoTopicCreationManagerTest {

Expand Down Expand Up @@ -903,4 +904,225 @@ class AutoTopicCreationManagerTest {
assertTrue(!cachedErrors.contains("test-topic-1"), "test-topic-1 should have been evicted")
assertTrue(!cachedErrors.contains("test-topic-2"), "test-topic-2 should have been evicted")
}

@Test
def testTopicsInBackoffAreNotRetried(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)

val topics = Map(
"test-topic" -> new CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L

// First attempt - trigger topic creation
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)

val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())

// Simulate error response to cache the error
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("test-topic")
.setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
.setErrorMessage("Invalid replication factor")
createTopicsResponseData.topics().add(topicResult)

val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
val clientResponse = new ClientResponse(header, null, null,
0, 0, false, null, null, createTopicsResponse)

argumentCaptor.getValue.onComplete(clientResponse)

// Verify error is cached
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"), mockTime.milliseconds())
assertEquals(1, cachedErrors.size)

// Second attempt - should NOT send request because topic is in back-off
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)

// Verify still only one request was sent (not retried during back-off)
Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
any(classOf[ControllerRequestCompletionHandler]))
}

@Test
def testTopicsOutOfBackoffCanBeRetried(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)

val topics = Map(
"test-topic" -> new CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
val shortTtlMs = 1000L

// First attempt - trigger topic creation
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, shortTtlMs)

val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())

// Simulate error response to cache the error
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("test-topic")
.setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
.setErrorMessage("Invalid replication factor")
createTopicsResponseData.topics().add(topicResult)

val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
val clientResponse = new ClientResponse(header, null, null,
0, 0, false, null, null, createTopicsResponse)

argumentCaptor.getValue.onComplete(clientResponse)

// Verify error is cached
val cachedErrors1 = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"), mockTime.milliseconds())
assertEquals(1, cachedErrors1.size)

// Advance time beyond TTL to exit back-off period
mockTime.sleep(shortTtlMs + 100)

// Verify error is expired
val cachedErrors2 = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"), mockTime.milliseconds())
assertTrue(cachedErrors2.isEmpty, "Error should be expired after TTL")

// Second attempt - should send request because topic is out of back-off
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, shortTtlMs)

// Verify a second request was sent (retry allowed after back-off expires)
Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
any(classOf[ControllerRequestCompletionHandler]))
}

@Test
def testInflightTopicsAreNotRetriedConcurrently(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)

val topics = Map(
"test-topic" -> new CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L

// First call - should send request and mark topic as in-flight
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)

Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
any(classOf[ControllerRequestCompletionHandler]))

// Second concurrent call - should NOT send request because topic is in-flight
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)

// Verify still only one request was sent (concurrent request blocked)
Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
any(classOf[ControllerRequestCompletionHandler]))
}

@Test
def testBackoffAndInflightInteraction(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)

val topics = Map(
"backoff-topic" -> new CreatableTopic().setName("backoff-topic").setNumPartitions(1).setReplicationFactor(1),
"inflight-topic" -> new CreatableTopic().setName("inflight-topic").setNumPartitions(1).setReplicationFactor(1),
"normal-topic" -> new CreatableTopic().setName("normal-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L

// Create error for backoff-topic
val backoffOnly = Map("backoff-topic" -> topics("backoff-topic"))
autoTopicCreationManager.createStreamsInternalTopics(backoffOnly, requestContext, timeoutMs)

val argumentCaptor1 = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor1.capture())

// Simulate error response for backoff-topic
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("backoff-topic")
.setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
.setErrorMessage("Invalid replication factor")
createTopicsResponseData.topics().add(topicResult)

val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
val clientResponse = new ClientResponse(header, null, null,
0, 0, false, null, null, createTopicsResponse)

argumentCaptor1.getValue.onComplete(clientResponse)

// Make inflight-topic in-flight (without completing the request)
val inflightOnly = Map("inflight-topic" -> topics("inflight-topic"))
autoTopicCreationManager.createStreamsInternalTopics(inflightOnly, requestContext, timeoutMs)

Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
any(classOf[ControllerRequestCompletionHandler]))

// Now attempt to create all three topics together
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)

val argumentCaptor2 = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
// Total 3 requests: 1 for backoff-topic, 1 for inflight-topic, 1 for normal-topic only
Mockito.verify(brokerToController, Mockito.times(3)).sendRequest(
argumentCaptor2.capture(),
any(classOf[ControllerRequestCompletionHandler]))

// Verify that only normal-topic was included in the last request
val lastRequest = argumentCaptor2.getValue.asInstanceOf[EnvelopeRequest.Builder]
.build(ApiKeys.ENVELOPE.latestVersion())
val forwardedRequestBuffer = lastRequest.requestData().duplicate()
val requestHeader = RequestHeader.parse(forwardedRequestBuffer)
val parsedRequest = CreateTopicsRequest.parse(new org.apache.kafka.common.protocol.ByteBufferAccessor(forwardedRequestBuffer),
requestHeader.apiVersion())

val topicNames = parsedRequest.data().topics().asScala.map(_.name()).toSet
assertEquals(1, topicNames.size, "Only normal-topic should be created")
assertTrue(topicNames.contains("normal-topic"), "normal-topic should be in the request")
assertTrue(!topicNames.contains("backoff-topic"), "backoff-topic should be filtered (in back-off)")
assertTrue(!topicNames.contains("inflight-topic"), "inflight-topic should be filtered (in-flight)")
}
}