Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}

import java.io.IOException
import java.net.{InetAddress, Socket}
import java.net.{InetAddress, InetSocketAddress, Socket}
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.Properties
import scala.collection.Map
Expand All @@ -49,7 +49,23 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {

val topic = "test"
val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val localAddress = InetAddress.getByName("127.0.0.1")
private lazy val localAddress: InetAddress = {
// This runs only after the broker is up (first use happens post setUp()).
def canConnect(host: String): Boolean = {
try {
val ln = ListenerName.normalised("PLAINTEXT")
val port = brokers.head.socketServer.boundPort(ln)
val s = new Socket()
s.connect(new InetSocketAddress(InetAddress.getByName(host), port), /*timeoutMs=*/500)
s.close()
true
} catch {
case _: Throwable => false
}
}
if (canConnect("::1")) InetAddress.getByName("::1") else InetAddress.getByName("127.0.0.1")
}

val unknownHost = "255.255.0.1"
val plaintextListenerDefaultQuota = 30
var executor: ExecutorService = _
Expand All @@ -66,6 +82,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
admin = createAdminClient(listener)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers)
topicId = TestUtils.describeTopic(admin, topic).topicId()
val _ = localAddress
TestUtils.waitUntilTrue(() => true, "tick")
}

@AfterEach
Expand Down Expand Up @@ -101,10 +119,10 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {

verifyMaxConnections(maxConnectionsPerIP, connectAndVerify)

// Increase MaxConnectionsPerIpOverrides for localhost to 7
// Increase MaxConnectionsPerIpOverrides for localAddress.getHostAddress to 7
val maxConnectionsPerIPOverride = 7
props.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"localhost:$maxConnectionsPerIPOverride")
reconfigureServers(props, perBrokerConfig = false, (SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"localhost:$maxConnectionsPerIPOverride"))
props.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"${localAddress.getHostAddress}:$maxConnectionsPerIPOverride")
reconfigureServers(props, perBrokerConfig = false, (SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"${localAddress.getHostAddress}:$maxConnectionsPerIPOverride"))

verifyMaxConnections(maxConnectionsPerIPOverride, connectAndVerify)
}
Expand Down Expand Up @@ -318,7 +336,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {

def connect(listener: String): Socket = {
val listenerName = ListenerName.normalised(listener)
new Socket("localhost", brokers.head.socketServer.boundPort(listenerName))
// Dial the exact literal we count against to avoid IPv4/IPv6 mismatch
new Socket(localAddress.getHostAddress, brokers.head.socketServer.boundPort(listenerName))
}

private def createAndVerifyConnection(listener: String = "PLAINTEXT"): Unit = {
Expand All @@ -334,7 +353,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
val produceResponse = sendAndReceive[ProduceResponse](produceRequest, socket)
assertEquals(1, produceResponse.data.responses.size)
val topicProduceResponse = produceResponse.data.responses.asScala.head
assertEquals(1, topicProduceResponse.partitionResponses.size)
assertEquals(1, topicProduceResponse.partitionResponses.size)
val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head
assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode))
}
Expand Down