diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java index b29a44493287..d982a29889ec 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java @@ -310,6 +310,7 @@ public List verify(final ConfigurationContext configur final Properties consumerProperties = getConsumerProperties(configurationContext, clientProperties); consumerProperties.putAll(variables); try (final Admin admin = Admin.create(consumerProperties)) { + // Verify topic listing final ListTopicsResult listTopicsResult = admin.listTopics(); final KafkaFuture> requestedListings = listTopicsResult.listings(); @@ -322,6 +323,11 @@ public List verify(final ConfigurationContext configur .explanation(topicListingExplanation) .build() ); + + // Verify cluster connectivity and node reachability + final String bootstrapServers = configurationContext.getProperty(BOOTSTRAP_SERVERS).getValue(); + final KafkaClusterVerifier clusterVerifier = new KafkaClusterVerifier(VERIFY_TIMEOUT, verificationLogger); + results.addAll(clusterVerifier.verifyClusterConnectivity(admin, bootstrapServers)); } catch (final Exception e) { verificationLogger.error("Kafka Broker verification failed", e); results.add( diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/KafkaClusterVerifier.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/KafkaClusterVerifier.java new file mode 100644 index 000000000000..0e2551b22f56 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/KafkaClusterVerifier.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service; + +import org.apache.commons.io.function.IOTriConsumer; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeClusterOptions; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.logging.ComponentLog; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SKIPPED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; + +/** + * Verifies Kafka cluster connectivity and node reachability. + */ +class KafkaClusterVerifier { + + private static final String CLUSTER_DESCRIPTION_STEP = "Kafka Cluster Description"; + private static final String NODE_CONFIG_CHECK_STEP = "Kafka Node Reachability"; + private static final String BOOTSTRAP_REACHABILITY_STEP = "Bootstrap Server Reachability"; + + private final Duration verifyTimeout; + private final ComponentLog logger; + private final IOTriConsumer socketConnector; + + KafkaClusterVerifier(final Duration verifyTimeout, final ComponentLog logger) { + this.verifyTimeout = verifyTimeout; + this.logger = logger; + this.socketConnector = KafkaClusterVerifier::defaultReachServer; + } + + // visible for testing + KafkaClusterVerifier(final Duration verifyTimeout, final ComponentLog logger, final IOTriConsumer socketConnector) { + this.verifyTimeout = verifyTimeout; + this.logger = logger; + this.socketConnector = socketConnector; + } + + /** + * Verifies cluster connectivity by describing the cluster and checking node connectivity. + * Falls back to bootstrap server connectivity check if cluster description fails due to permissions. + * + * @param admin Kafka Admin client + * @param bootstrapServers Bootstrap servers configuration string + * @return List of verification results + */ + List verifyClusterConnectivity(final Admin admin, final String bootstrapServers) { + final List results = new ArrayList<>(); + + // Use virtual thread executor for lightweight parallel execution + try (final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + final ClusterNodeResult clusterNodeResult = describeCluster(admin); + results.add(clusterNodeResult.result()); + + if (clusterNodeResult.nodes() != null) { + // Check each node's configuration + results.addAll(verifyNodesReachability(clusterNodeResult.nodes(), executor)); + + } else { + // Fallback to checking bootstrap servers + results.addAll(verifyBootstrapServersReachability(bootstrapServers, executor)); + } + } + + return results; + } + + private record ClusterNodeResult(Collection nodes, ConfigVerificationResult result) { } + + private ClusterNodeResult describeCluster(final Admin admin) { + Collection nodes = null; + ConfigVerificationResult configVerificationResult; + try { + // Try to describe cluster + final DescribeClusterResult describeClusterResult = admin.describeCluster(new DescribeClusterOptions().timeoutMs((int) verifyTimeout.toMillis())); + final KafkaFuture> nodesFuture = describeClusterResult.nodes(); + nodes = nodesFuture.get(); + + final String clusterExplanation = "Cluster Nodes Found [%d]".formatted(nodes.size()); + configVerificationResult = + new ConfigVerificationResult.Builder() + .verificationStepName(CLUSTER_DESCRIPTION_STEP) + .outcome(SUCCESSFUL) + .explanation(clusterExplanation) + .build(); + + } catch (final Exception e) { + + if (isCausedByAuthorizationException(e)) { + logger.warn("Describe Cluster insufficient permissions", e); + configVerificationResult = + new ConfigVerificationResult.Builder() + .verificationStepName(CLUSTER_DESCRIPTION_STEP) + .outcome(SKIPPED) + .explanation("Insufficient permissions to describe cluster.") + .build(); + + } else { + logger.error("Describe Cluster failed", e); + configVerificationResult = + new ConfigVerificationResult.Builder() + .verificationStepName(CLUSTER_DESCRIPTION_STEP) + .outcome(SKIPPED) + .explanation("Cluster description failed: " + e) + .build(); + } + } + + return new ClusterNodeResult(nodes, configVerificationResult); + } + + private boolean isCausedByAuthorizationException(final Exception e) { + Throwable cause = e; + while (!(cause instanceof AuthorizationException || cause == null)) { + cause = cause.getCause(); + } + return cause != null; + } + + /** + * Verifies each node's reachability by attempting socket connections in parallel using virtual threads. + * Returns a ConfigVerificationResult for each individual node. + * + * @param nodes Collection of cluster nodes + * @return List of verification results, one per node + */ + private List verifyNodesReachability(final Collection nodes, final ExecutorService executor) { + final List results = new ArrayList<>(); + + if (nodes.isEmpty()) { + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(NODE_CONFIG_CHECK_STEP) + .outcome(FAILED) + .explanation("No nodes found in cluster") + .build() + ); + return results; + } + + // Submit parallel tasks to verify each node's reachability + final List verificationContexts = new ArrayList<>(); + for (final Node node : nodes) { + final Callable task = () -> verifyNodeReachability(node); + verificationContexts.add(new NodeVerificationContext(node, executor.submit(task))); + } + + // Collect results from all parallel tasks + for (final NodeVerificationContext nodeVerificationContext : verificationContexts) { + try { + results.add(nodeVerificationContext.resultFuture().get()); + } catch (final Exception e) { + final Node node = nodeVerificationContext.node(); + logger.warn("Node {} ({}:{}) reachability check result collection exception", node.id(), node.host(), node.port(), e); + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(verifyNodeStepName(node)) + .outcome(FAILED) + .explanation("Task execution failed: %s".formatted(e)) + .build() + ); + } + } + + return results; + } + + private record NodeVerificationContext(Node node, Future resultFuture) { } + + /** + * Verifies a single node's reachability using socket connection. + * + * @param node The Kafka node being verified + * @return ConfigVerificationResult indicating success or failure + */ + private ConfigVerificationResult verifyNodeReachability(final Node node) { + final ConfigVerificationResult.Builder stepBuilder = new ConfigVerificationResult.Builder() + .verificationStepName(verifyNodeStepName(node)); + + try { + reachServer(node.host(), node.port()); + return stepBuilder + .outcome(SUCCESSFUL) + .explanation("Node is reachable") + .build(); + + } catch (final Exception e) { + logger.warn("Node {} ({}:{}) is not reachable", node.id(), node.host(), node.port(), e); + return stepBuilder + .outcome(FAILED) + .explanation("Connection failed: %s".formatted(e)) + .build(); + } + } + + private String verifyNodeStepName(final Node node) { + return "%s - Node %s (%s:%d)".formatted(NODE_CONFIG_CHECK_STEP, node.id(), node.host(), node.port()); + } + + /** + * Fallback method to verify bootstrap server reachability using socket connections in parallel using virtual threads. + * Returns a ConfigVerificationResult for each individual bootstrap server. + * + * @param bootstrapServers Bootstrap servers configuration string + * @return List of verification results, one per bootstrap server + */ + private List verifyBootstrapServersReachability(final String bootstrapServers, final ExecutorService executor) { + final List results = new ArrayList<>(); + + if (bootstrapServers == null || bootstrapServers.isBlank()) { + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(BOOTSTRAP_REACHABILITY_STEP) + .outcome(FAILED) + .explanation("No bootstrap servers configured") + .build() + ); + return results; + } + + final String[] servers = bootstrapServers.split(","); + final List validServers = new ArrayList<>(); + + // Filter out empty server entries + for (final String server : servers) { + final String trimmedServer = server.trim(); + if (!trimmedServer.isEmpty()) { + validServers.add(trimmedServer); + } + } + + if (validServers.isEmpty()) { + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(BOOTSTRAP_REACHABILITY_STEP) + .outcome(FAILED) + .explanation("No valid bootstrap servers configured") + .build() + ); + return results; + } + + // Submit parallel tasks to verify each bootstrap server + final List verificationContexts = new ArrayList<>(); + for (final String server : validServers) { + final Callable task = () -> verifyBootstrapServer(server); + verificationContexts.add(new BootstrapServerVerificationContext(server, executor.submit(task))); + } + + // Collect results from all parallel tasks + for (final BootstrapServerVerificationContext bootstrapServerVerificationContext : verificationContexts) { + try { + results.add(bootstrapServerVerificationContext.resultFuture().get()); + } catch (final Exception e) { + final String server = bootstrapServerVerificationContext.server(); + logger.warn("Bootstrap Server {} result collection exception", server, e); + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(verifyBootstrapServerStepName(server)) + .outcome(FAILED) + .explanation("Task execution failed: %s".formatted(e)) + .build() + ); + } + } + + return results; + } + + private record BootstrapServerVerificationContext(String server, Future resultFuture) { } + + /** + * Verifies a single bootstrap server's reachability. + * + * @param server Bootstrap server address + * @return ConfigVerificationResult indicating reachability status + */ + private ConfigVerificationResult verifyBootstrapServer(final String server) { + final ConfigVerificationResult.Builder stepBuilder = new ConfigVerificationResult.Builder() + .verificationStepName(verifyBootstrapServerStepName(server)); + + try { + // Remove protocol if present + String serverAddress = server; + if (serverAddress.contains("://")) { + serverAddress = serverAddress.substring(serverAddress.indexOf("://") + 3); + } + + final String[] parts = serverAddress.split(":"); + final String host = parts[0]; + final int port = parts.length > 1 ? Integer.parseInt(parts[1]) : 9092; // Default Kafka port + + try { + reachServer(host, port); + return stepBuilder + .outcome(SUCCESSFUL) + .explanation("Bootstrap Server is reachable") + .build(); + } catch (final Exception e) { + logger.warn("Bootstrap Server {} is not reachable", server, e); + return stepBuilder + .outcome(FAILED) + .explanation("Connection failed: %s".formatted(e)) + .build(); + } + + } catch (final Exception e) { + logger.warn("Bootstrap Server {} reachability check failed", server, e); + return stepBuilder + .outcome(FAILED) + .explanation("Invalid format or error: %s".formatted(e)) + .build(); + } + } + + private String verifyBootstrapServerStepName(final String server) { + return "%s - %s".formatted(BOOTSTRAP_REACHABILITY_STEP, server); + } + + private void reachServer(final String host, final int port) throws IOException { + this.socketConnector.accept(host, port, verifyTimeout); + } + + private static void defaultReachServer(final String host, final int port, final Duration timeout) throws IOException { + try (final Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(host, port), (int) timeout.toMillis()); + } + } +} + diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java index d466580723d3..231ee143f468 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java @@ -78,6 +78,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -300,6 +301,15 @@ void testVerifySuccessful() { final ConfigVerificationResult firstResult = results.iterator().next(); assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, firstResult.getOutcome()); assertNotNull(firstResult.getExplanation()); + + final Optional clusterDescriptionStep = results.stream().filter(it -> "Kafka Cluster Description".equals(it.getVerificationStepName())).findFirst(); + assertTrue(clusterDescriptionStep.isPresent()); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, clusterDescriptionStep.get().getOutcome()); + + final Pattern brokerPattern = Pattern.compile(".*? Node \\d+ \\(.*:\\d+\\)"); + final Optional reachBrokerStep = results.stream().filter(it -> brokerPattern.matcher(it.getVerificationStepName()).matches()).findFirst(); + assertTrue(reachBrokerStep.isPresent()); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, reachBrokerStep.get().getOutcome()); } @Test diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/KafkaClusterVerifierTest.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/KafkaClusterVerifierTest.java new file mode 100644 index 000000000000..94eee1c365b4 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/KafkaClusterVerifierTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service; + +import org.apache.commons.io.function.IOTriConsumer; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeClusterOptions; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.logging.ComponentLog; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; + +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SKIPPED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class KafkaClusterVerifierTest { + + private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(2); + private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093"; + + @Mock + private Admin admin; + + @Mock + private ComponentLog logger; + + @Mock + private DescribeClusterResult describeClusterResult; + + private IOTriConsumer socketConnector; + private KafkaClusterVerifier verifier; + + @BeforeEach + void setUp() { + socketConnector = (host, port, timeout) -> { + // Simulate successful connection by default + }; + verifier = new KafkaClusterVerifier(VERIFY_TIMEOUT, logger, (host, port, timeout) -> socketConnector.accept(host, port, timeout)); + } + + @Test + void testVerifyClusterConnectivitySuccess() { + final Node node1 = new Node(1, "broker1.example.com", 9092); + final Collection nodes = List.of(node1); + + final KafkaFuture> nodesFuture = KafkaFuture.completedFuture(nodes); + when(admin.describeCluster(any(DescribeClusterOptions.class))).thenReturn(describeClusterResult); + when(describeClusterResult.nodes()).thenReturn(nodesFuture); + + final List results = verifier.verifyClusterConnectivity(admin, BOOTSTRAP_SERVERS); + + assertEquals(2, results.size(), "Should have cluster description + 1 node result"); + + // Cluster description result + assertEquals(SUCCESSFUL, results.getFirst().getOutcome()); + assertTrue(results.getFirst().getExplanation().contains("Cluster Nodes Found [1]")); + + final boolean hasNodeResult = results.stream() + .anyMatch(r -> r.getVerificationStepName().contains("Node 1") + && r.getOutcome() == SUCCESSFUL + && r.getVerificationStepName().contains("broker1.example.com")); + assertTrue(hasNodeResult, "Should have a node verification result"); + } + + @Test + void testVerifyClusterConnectivityWithAuthorizationException() { + when(admin.describeCluster(any(DescribeClusterOptions.class))).thenThrow(new AuthorizationException("Unauthorized")); + + final List results = verifier.verifyClusterConnectivity(admin, BOOTSTRAP_SERVERS); + + assertFalse(results.isEmpty()); + + final ConfigVerificationResult clusterResult = results.stream() + .filter(r -> r.getVerificationStepName().contains("Cluster Description")) + .findFirst() + .orElseThrow(); + + assertEquals(SKIPPED, clusterResult.getOutcome(), "Should have skipped cluster description"); + assertTrue(clusterResult.getExplanation().contains("Insufficient permissions")); + + final long bootstrapResults = results.stream() + .filter(r -> r.getVerificationStepName().contains("Bootstrap Server Reachability")) + .count(); + + assertTrue(bootstrapResults > 0, "Should have bootstrap server results (fallback)"); + } + + @Test + void testVerifyClusterConnectivityWithGeneralException() { + when(admin.describeCluster(any(DescribeClusterOptions.class))).thenThrow(new TimeoutException("Connection timeout")); + + final List results = verifier.verifyClusterConnectivity(admin, BOOTSTRAP_SERVERS); + + assertFalse(results.isEmpty()); + + final ConfigVerificationResult clusterResult = results.stream() + .filter(r -> r.getVerificationStepName().contains("Cluster Description")) + .findFirst() + .orElseThrow(); + + assertEquals(SKIPPED, clusterResult.getOutcome(), "Should skip cluster description"); + assertTrue(clusterResult.getExplanation().contains("Cluster description failed")); + } + + @Test + void testVerifyClusterConnectivityWithEmptyNodes() { + final Collection nodes = List.of(); + final KafkaFuture> nodesFuture = KafkaFuture.completedFuture(nodes); + when(admin.describeCluster(any(DescribeClusterOptions.class))).thenReturn(describeClusterResult); + when(describeClusterResult.nodes()).thenReturn(nodesFuture); + + final List results = verifier.verifyClusterConnectivity(admin, BOOTSTRAP_SERVERS); + + assertTrue(results.size() >= 2); + + // Cluster description result + assertEquals(SUCCESSFUL, results.getFirst().getOutcome()); + assertTrue(results.getFirst().getExplanation().contains("Cluster Nodes Found [0]")); + + final boolean hasEmptyNodesResult = results.stream() + .anyMatch(r -> r.getOutcome() == FAILED && r.getExplanation().contains("No nodes found in cluster")); + assertTrue(hasEmptyNodesResult, "Should have a failure for empty nodes"); + } + + @Test + void testVerifyNodeReachabilityWithTimeout() { + final Node node = new Node(1, "broker1.example.com", 9092); + final Collection nodes = List.of(node); + + final KafkaFuture> nodesFuture = KafkaFuture.completedFuture(nodes); + when(admin.describeCluster(any(DescribeClusterOptions.class))).thenReturn(describeClusterResult); + when(describeClusterResult.nodes()).thenReturn(nodesFuture); + + socketConnector = (host, port, timeout) -> { + throw new TimeoutException("Timeout"); + }; + + final List results = verifier.verifyClusterConnectivity(admin, BOOTSTRAP_SERVERS); + + final boolean hasFailedNodeResult = results.stream() + .anyMatch(r -> r.getVerificationStepName().contains("Node 1") + && r.getVerificationStepName().contains("broker1.example.com") + && r.getOutcome() == FAILED + && r.getExplanation().matches(".*Timeout")); + + assertTrue(hasFailedNodeResult, "Should have a failed node result for Timeout"); + } + + @Test + void testVerifyBootstrapServersReachabilityWithNullServers() { + final List results = verifier.verifyClusterConnectivity(admin, null); + + final ConfigVerificationResult bootstrapResult = results.stream() + .filter(r -> r.getVerificationStepName().contains("Bootstrap Server")) + .findFirst() + .orElseThrow(); + + assertEquals(FAILED, bootstrapResult.getOutcome()); + assertTrue(bootstrapResult.getExplanation().contains("No bootstrap servers configured")); + } + + @Test + void testVerifyBootstrapServersReachabilityWithEmptyServers() { + final List results = verifier.verifyClusterConnectivity(admin, " "); + + final ConfigVerificationResult bootstrapResult = results.stream() + .filter(r -> r.getVerificationStepName().contains("Bootstrap Server")) + .findFirst() + .orElseThrow(); + + assertEquals(FAILED, bootstrapResult.getOutcome()); + assertTrue(bootstrapResult.getExplanation().contains("No bootstrap servers configured")); + } + + @Test + void testVerifyBootstrapServersReachabilityWithInvalidFormat() { + when(admin.describeCluster(any(DescribeClusterOptions.class))).thenThrow(new AuthorizationException("Unauthorized")); + + final String invalidServer = "invalid:server:format:9092"; + + final List results = verifier.verifyClusterConnectivity(admin, invalidServer); + + final ConfigVerificationResult bootstrapResult = results.stream() + .filter(r -> r.getVerificationStepName().contains("Bootstrap Server Reachability - " + invalidServer)) + .findFirst() + .orElseThrow(); + + assertEquals(FAILED, bootstrapResult.getOutcome()); + assertTrue(bootstrapResult.getExplanation().contains("Invalid format or error")); + } + + @Test + void testVerifyBootstrapServersReachabilityMultiple() { + when(admin.describeCluster(any(DescribeClusterOptions.class))).thenThrow(new AuthorizationException("Unauthorized")); + + final String multipleServers = "broker1:9092,broker2:9093,broker3:9094"; + + final List results = verifier.verifyClusterConnectivity(admin, multipleServers); + + final long bootstrapResultCount = results.stream() + .filter(r -> r.getVerificationStepName().contains("Bootstrap Server Reachability")) + .count(); + + assertTrue(bootstrapResultCount >= 3, "Should have results for all bootstrap servers"); + } +} +