diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala index e2db135124459..ac93db3b7e560 100644 --- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala @@ -37,7 +37,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} import java.io.IOException -import java.net.{InetAddress, Socket} +import java.net.{InetAddress, InetSocketAddress, Socket} import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import java.util.Properties import scala.collection.Map @@ -49,7 +49,23 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { val topic = "test" val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) - val localAddress = InetAddress.getByName("127.0.0.1") + private lazy val localAddress: InetAddress = { + // This runs only after the broker is up (first use happens post setUp()). + def canConnect(host: String): Boolean = { + try { + val ln = ListenerName.normalised("PLAINTEXT") + val port = brokers.head.socketServer.boundPort(ln) + val s = new Socket() + s.connect(new InetSocketAddress(InetAddress.getByName(host), port), /*timeoutMs=*/500) + s.close() + true + } catch { + case _: Throwable => false + } + } + if (canConnect("::1")) InetAddress.getByName("::1") else InetAddress.getByName("127.0.0.1") + } + val unknownHost = "255.255.0.1" val plaintextListenerDefaultQuota = 30 var executor: ExecutorService = _ @@ -66,6 +82,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { admin = createAdminClient(listener) TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers) topicId = TestUtils.describeTopic(admin, topic).topicId() + val _ = localAddress + TestUtils.waitUntilTrue(() => true, "tick") } @AfterEach @@ -101,10 +119,10 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { verifyMaxConnections(maxConnectionsPerIP, connectAndVerify) - // Increase MaxConnectionsPerIpOverrides for localhost to 7 + // Increase MaxConnectionsPerIpOverrides for localAddress.getHostAddress to 7 val maxConnectionsPerIPOverride = 7 - props.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"localhost:$maxConnectionsPerIPOverride") - reconfigureServers(props, perBrokerConfig = false, (SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"localhost:$maxConnectionsPerIPOverride")) + props.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"${localAddress.getHostAddress}:$maxConnectionsPerIPOverride") + reconfigureServers(props, perBrokerConfig = false, (SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"${localAddress.getHostAddress}:$maxConnectionsPerIPOverride")) verifyMaxConnections(maxConnectionsPerIPOverride, connectAndVerify) } @@ -318,7 +336,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { def connect(listener: String): Socket = { val listenerName = ListenerName.normalised(listener) - new Socket("localhost", brokers.head.socketServer.boundPort(listenerName)) + // Dial the exact literal we count against to avoid IPv4/IPv6 mismatch + new Socket(localAddress.getHostAddress, brokers.head.socketServer.boundPort(listenerName)) } private def createAndVerifyConnection(listener: String = "PLAINTEXT"): Unit = { @@ -334,7 +353,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { val produceResponse = sendAndReceive[ProduceResponse](produceRequest, socket) assertEquals(1, produceResponse.data.responses.size) val topicProduceResponse = produceResponse.data.responses.asScala.head - assertEquals(1, topicProduceResponse.partitionResponses.size) + assertEquals(1, topicProduceResponse.partitionResponses.size) val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode)) }