Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
files="ClientUtils.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest).java"/>
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest|ClientTelemetryReporterTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
Expand Down Expand Up @@ -528,13 +529,13 @@ public void handleResponse(PushTelemetryResponse response) {
@Override
public void handleFailedGetTelemetrySubscriptionsRequest(KafkaException maybeFatalException) {
log.debug("The broker generated an error for the get telemetry network API request", maybeFatalException);
handleFailedRequest(maybeFatalException != null);
handleFailedRequest(isRetryable(maybeFatalException));
}

@Override
public void handleFailedPushTelemetryRequest(KafkaException maybeFatalException) {
log.debug("The broker generated an error for the push telemetry network API request", maybeFatalException);
handleFailedRequest(maybeFatalException != null);
handleFailedRequest(isRetryable(maybeFatalException));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, the consumer will display the following warning, even though users may not be concerned with telemetry

[2025-10-16 02:27:16,077] WARN Received unrecoverable error from broker, disabling telemetry (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)

Perhaps we shouldn't print the warning if the error is an UnsupportedVersionException. What do you think?

}

@Override
Expand Down Expand Up @@ -628,6 +629,12 @@ public void initiateClose() {
}
}

private boolean isRetryable(final KafkaException maybeFatalException) {
return maybeFatalException == null ||
(maybeFatalException instanceof RetriableException) ||
(maybeFatalException.getCause() != null && maybeFatalException.getCause() instanceof RetriableException);
}

private Optional<Builder<?>> createSubscriptionRequest(ClientTelemetrySubscription localSubscription) {
/*
If we've previously retrieved a subscription, it will contain the client instance ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
Expand Down Expand Up @@ -901,6 +906,163 @@ public void testTelemetryReporterInitiateCloseAlreadyInTerminatedStates() {
.telemetrySender()).state());
}

@Test
public void testHandleFailedGetTelemetrySubscriptionsRequestWithRetriableException() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));

KafkaException retriableException = new TimeoutException("Request timed out");
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(retriableException);

assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
assertTrue(telemetrySender.enabled());
}

@Test
public void testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedRetriableException() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));

KafkaException wrappedException = new KafkaException(new DisconnectException("Connection lost"));
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);

assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
assertTrue(telemetrySender.enabled());
}

@Test
public void testHandleFailedGetTelemetrySubscriptionsRequestWithFatalException() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));

KafkaException fatalException = new AuthorizationException("Not authorized for telemetry");
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(fatalException);

assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
}

@Test
public void testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedFatalException() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));

KafkaException wrappedException = new KafkaException("Version check failed",
new UnsupportedVersionException("Broker doesn't support telemetry"));
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);

assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
}

@Test
public void testHandleFailedPushTelemetryRequestWithRetriableException() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));

KafkaException networkException = new NetworkException("Network failure");
telemetrySender.handleFailedPushTelemetryRequest(networkException);

assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
assertTrue(telemetrySender.enabled());
}

@Test
public void testHandleFailedPushTelemetryRequestWithFatalException() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));

KafkaException authException = new AuthorizationException("Not authorized to push telemetry");
telemetrySender.handleFailedPushTelemetryRequest(authException);

assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
}

@Test
public void testHandleFailedRequestWithMultipleRetriableExceptionsInChain() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));

KafkaException chainedException = new TimeoutException("Outer timeout",
new DisconnectException("Inner disconnect"));
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(chainedException);

assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
assertTrue(telemetrySender.enabled());
}

@Test
public void testHandleFailedRequestWithGenericKafkaException() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));

KafkaException genericException = new KafkaException("Unknown error");
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(genericException);

assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
assertFalse(telemetrySender.enabled());
}

@Test
public void testHandleFailedRequestDuringTermination() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));

KafkaException exception = new TimeoutException("Timeout");
telemetrySender.handleFailedPushTelemetryRequest(exception);

assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED, telemetrySender.state());
assertTrue(telemetrySender.enabled());
}

@Test
public void testSequentialFailuresWithDifferentExceptionTypes() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());

assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
new TimeoutException("Timeout 1"));
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertTrue(telemetrySender.enabled());

assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
new DisconnectException("Disconnect"));
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertTrue(telemetrySender.enabled());

assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
new UnsupportedVersionException("Version not supported"));
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
assertFalse(telemetrySender.enabled());
}

@AfterEach
public void tearDown() {
clientTelemetryReporter.close();
Expand Down