From c231f07798bbdcfe0cefaada2c13a728aaf9ccef Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 7 Oct 2025 21:32:03 +0800 Subject: [PATCH 1/5] fix null value and error message --- .../main/java/org/apache/kafka/streams/CloseOptions.java | 6 +++++- .../main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java index 25be8530a4c13..73b0ea4132f99 100644 --- a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java +++ b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java @@ -81,7 +81,11 @@ public static CloseOptions groupMembershipOperation(final GroupMembershipOperati * @return this {@code CloseOptions} instance. */ public CloseOptions withTimeout(final Duration timeout) { - this.timeout = Optional.ofNullable(timeout); + if (timeout == null) { + this.timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE)); + } else { + this.timeout = Optional.of(timeout); + } return this; } diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 999befa349535..dfcfbaf94e224 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1635,7 +1635,7 @@ public synchronized boolean close(final CloseOptions options) throws IllegalArgu public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException { Objects.requireNonNull(options, "options cannot be null"); final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options); - final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout"); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout().get(), "timeout"); final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix); if (timeoutMs < 0) { throw new IllegalArgumentException("Timeout can't be negative."); From 0b6f7ba0a974b2e18c7a21c18cc0a000c02bb578 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 7 Oct 2025 22:15:58 +0800 Subject: [PATCH 2/5] addressed by comments --- .../main/java/org/apache/kafka/streams/CloseOptions.java | 6 +----- .../main/java/org/apache/kafka/streams/KafkaStreams.java | 5 +++-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java index 73b0ea4132f99..25be8530a4c13 100644 --- a/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java +++ b/streams/src/main/java/org/apache/kafka/streams/CloseOptions.java @@ -81,11 +81,7 @@ public static CloseOptions groupMembershipOperation(final GroupMembershipOperati * @return this {@code CloseOptions} instance. */ public CloseOptions withTimeout(final Duration timeout) { - if (timeout == null) { - this.timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE)); - } else { - this.timeout = Optional.of(timeout); - } + this.timeout = Optional.ofNullable(timeout); return this; } diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index dfcfbaf94e224..fad08e243b73b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1635,8 +1635,9 @@ public synchronized boolean close(final CloseOptions options) throws IllegalArgu public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException { Objects.requireNonNull(options, "options cannot be null"); final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options); - final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout().get(), "timeout"); - final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix); + final Duration timeout = optionsInternal.timeout().orElse(Duration.ofMillis(Long.MAX_VALUE)); + final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); + final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); if (timeoutMs < 0) { throw new IllegalArgumentException("Timeout can't be negative."); } From ead86648a84033f88280b893dc5fa928b4b763e6 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 9 Oct 2025 08:24:28 +0800 Subject: [PATCH 3/5] use orElseGet to late init --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index fad08e243b73b..99c36429a5ded 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1635,7 +1635,7 @@ public synchronized boolean close(final CloseOptions options) throws IllegalArgu public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException { Objects.requireNonNull(options, "options cannot be null"); final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options); - final Duration timeout = optionsInternal.timeout().orElse(Duration.ofMillis(Long.MAX_VALUE)); + final Duration timeout = optionsInternal.timeout().orElseGet(() -> Duration.ofMillis(Long.MAX_VALUE)); final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); if (timeoutMs < 0) { From ce65797c60f19cb62e09b9bb291cbc26cdf40159 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 11 Oct 2025 09:34:26 +0800 Subject: [PATCH 4/5] add unit test for null timeout --- .../apache/kafka/streams/KafkaStreams.java | 3 ++- .../kafka/streams/KafkaStreamsTest.java | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 99c36429a5ded..68abb913612f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1523,7 +1523,8 @@ private Thread shutdownHelper( }, clientId + "-CloseThread"); } - private boolean close(final Optional timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) { + // visible for testing + boolean close(final Optional timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) { final long timeoutMs; if (timeout.isPresent()) { timeoutMs = timeout.get(); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 1ceebab1cd72a..ced5ee462d996 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -120,6 +120,7 @@ import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.isA; import static org.mockito.Mockito.mock; @@ -1297,6 +1298,28 @@ public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupTrue() } } + @Test + @SuppressWarnings("unchecked") + public void shouldNotThrowOnNullTimeoutForCloseWithCloseOptionLeaveGroupTrue() throws Exception { + prepareStreams(); + prepareStreamThread(streamThreadOne, 1); + prepareStreamThread(streamThreadTwo, 2); + prepareTerminableThread(streamThreadOne); + + final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class); + when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient); + + final CloseOptions closeOptions = CloseOptions.timeout(null) + .withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP); + final KafkaStreams streams = spy(new KafkaStreamsWithTerminableThread( + getBuilderWithSource().build(), props, mockClientSupplier, time)); + + doReturn(false).when(streams).close(any(Optional.class), any()); + streams.close(closeOptions); + + verify(streams).close(eq(Optional.of(Long.MAX_VALUE)), eq(CloseOptions.GroupMembershipOperation.LEAVE_GROUP)); + } + @Test public void shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() throws Exception { prepareStreams(); From 111388d5aed44819cdd33323df21a59fb6deea30 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 11 Oct 2025 09:36:08 +0800 Subject: [PATCH 5/5] update the test name --- .../test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index ced5ee462d996..83eeb0befc603 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -1300,7 +1300,7 @@ public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupTrue() @Test @SuppressWarnings("unchecked") - public void shouldNotThrowOnNullTimeoutForCloseWithCloseOptionLeaveGroupTrue() throws Exception { + public void shouldUseDefaultTimeoutForCloseWithNullTimeout() throws Exception { prepareStreams(); prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2);