Skip to content

Commit 3a533e0

Browse files
committed
add new test
1 parent e60c0eb commit 3a533e0

File tree

2 files changed

+42
-6
lines changed

2 files changed

+42
-6
lines changed

core/src/main/scala/kafka/server/KafkaRequestHandler.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ class KafkaRequestHandler(
148148
if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)
149149
originalRequest.callbackRequestCompleteTimeNanos = Some(time.nanoseconds())
150150
threadCurrentRequest.remove()
151+
if (!originalRequest.header.apiKey().forwardable)
152+
originalRequest.releaseBuffer()
151153
}
152154

153155
case request: RequestChannel.Request =>
@@ -163,9 +165,8 @@ class KafkaRequestHandler(
163165
case e: Throwable => error("Exception when handling request", e)
164166
} finally {
165167
threadCurrentRequest.remove()
166-
if (!request.header.apiKey().forwardable) {
168+
if (!request.header.apiKey().forwardable)
167169
request.releaseBuffer()
168-
}
169170
}
170171

171172
case RequestChannel.WakeupRequest =>

core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala

+39-4
Original file line numberDiff line numberDiff line change
@@ -231,16 +231,23 @@ class KafkaRequestHandlerTest {
231231
})
232232
}
233233

234-
def makeRequest(time: Time, metrics: RequestChannelMetrics): RequestChannel.Request = {
234+
def makeRequest(
235+
time: Time,
236+
metrics: RequestChannelMetrics,
237+
apiKeys: ApiKeys = ApiKeys.API_VERSIONS,
238+
version: Short = 0,
239+
buffer: ByteBuffer = ByteBuffer.allocate(0),
240+
memoryPool: MemoryPool = mock(classOf[MemoryPool])
241+
): RequestChannel.Request = {
235242
// Make unsupported API versions request to avoid having to parse a real request
236243
val requestHeader = mock(classOf[RequestHeader])
237-
when(requestHeader.apiKey()).thenReturn(ApiKeys.API_VERSIONS)
238-
when(requestHeader.apiVersion()).thenReturn(0.toShort)
244+
when(requestHeader.apiKey()).thenReturn(apiKeys)
245+
when(requestHeader.apiVersion()).thenReturn(version)
239246

240247
val context = new RequestContext(requestHeader, "0", mock(classOf[InetAddress]), new KafkaPrincipal("", ""),
241248
new ListenerName(""), SecurityProtocol.PLAINTEXT, mock(classOf[ClientInformation]), false)
242249
new RequestChannel.Request(0, context, time.nanoseconds(),
243-
mock(classOf[MemoryPool]), ByteBuffer.allocate(0), metrics)
250+
memoryPool, buffer, metrics)
244251
}
245252

246253
def setupBrokerTopicMetrics(systemRemoteStorageEnabled: Boolean = true): BrokerTopicMetrics = {
@@ -699,4 +706,32 @@ class KafkaRequestHandlerTest {
699706
// cleanup
700707
brokerTopicStats.close()
701708
}
709+
710+
@Test
711+
def testRequestBufferRelease(): Unit = {
712+
val time = new MockTime()
713+
val metrics = new RequestChannelMetrics(Collections.emptySet[ApiKeys])
714+
val requestChannel = new RequestChannel(10, time, metrics)
715+
val apiHandler = mock(classOf[ApiRequestHandler])
716+
val memoryPool = mock(classOf[MemoryPool])
717+
val buffer = ByteBuffer.allocate(1024)
718+
719+
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
720+
721+
val request = makeRequest(time, metrics, ApiKeys.PRODUCE, 3, buffer, memoryPool)
722+
requestChannel.sendRequest(request)
723+
724+
val shutdownThread = new Thread(() => {
725+
try {
726+
Thread.sleep(1000)
727+
requestChannel.sendShutdownRequest()
728+
} catch {
729+
case _: InterruptedException =>
730+
}
731+
})
732+
733+
shutdownThread.start()
734+
handler.run()
735+
verify(memoryPool, times(1)).release(buffer)
736+
}
702737
}

0 commit comments

Comments
 (0)