diff --git a/src/main/java/io/vertx/core/net/TrafficShapingOptions.java b/src/main/java/io/vertx/core/net/TrafficShapingOptions.java index 1e1f33f2e0c..e978ad27dcd 100644 --- a/src/main/java/io/vertx/core/net/TrafficShapingOptions.java +++ b/src/main/java/io/vertx/core/net/TrafficShapingOptions.java @@ -222,4 +222,28 @@ public long getCheckIntervalForStats() { public TimeUnit getCheckIntervalForStatsTimeUnit() { return checkIntervalForStatsTimeUnit; } + + @Override + public boolean equals(Object obj) { + TrafficShapingOptions that = (TrafficShapingOptions) obj; + return inboundGlobalBandwidth == that.inboundGlobalBandwidth && + outboundGlobalBandwidth == that.outboundGlobalBandwidth && + peakOutboundGlobalBandwidth == that.peakOutboundGlobalBandwidth && + maxDelayToWait == that.maxDelayToWait && + maxDelayToWaitTimeUnit == that.maxDelayToWaitTimeUnit && + checkIntervalForStats == that.checkIntervalForStats && + checkIntervalForStatsTimeUnit == that.checkIntervalForStatsTimeUnit; + } + + @Override + public int hashCode() { + return Objects.hash(inboundGlobalBandwidth, + outboundGlobalBandwidth, + peakOutboundGlobalBandwidth, + maxDelayToWait, + maxDelayToWaitTimeUnit, + checkIntervalForStats, + checkIntervalForStatsTimeUnit); + } + } diff --git a/src/main/java/io/vertx/core/net/impl/TCPServerBase.java b/src/main/java/io/vertx/core/net/impl/TCPServerBase.java index 08cac0c2773..9f98235769e 100644 --- a/src/main/java/io/vertx/core/net/impl/TCPServerBase.java +++ b/src/main/java/io/vertx/core/net/impl/TCPServerBase.java @@ -153,23 +153,32 @@ public void updateTrafficShapingOptions(TrafficShapingOptions options) { if (options == null) { throw new IllegalArgumentException("Invalid null value passed for traffic shaping options update"); } - if (trafficShapingHandler == null) { - throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " + - "to use traffic shaping during startup"); - } TCPServerBase server = actualServer; + // Update the traffic shaping options only for the actual/main server if (server != null && server != this) { server.updateTrafficShapingOptions(options); } else { - long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats()); - trafficShapingHandler.configure(options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis); + if (trafficShapingHandler == null) { + throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " + + "to use traffic shaping during startup"); + } - if (options.getPeakOutboundGlobalBandwidth() != 0) { - trafficShapingHandler.setMaxGlobalWriteSize(options.getPeakOutboundGlobalBandwidth()); + // Compare with existing traffic-shaping options to ensure they are updated only when they differ. + if(!options.equals(server.options.getTrafficShapingOptions())) { + server.options.setTrafficShapingOptions(options); + long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats()); + trafficShapingHandler.configure(options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis); + + if (options.getPeakOutboundGlobalBandwidth() != 0) { + trafficShapingHandler.setMaxGlobalWriteSize(options.getPeakOutboundGlobalBandwidth()); + } + if (options.getMaxDelayToWait() != 0) { + long maxDelayToWaitInMillis = options.getMaxDelayToWaitTimeUnit().toMillis(options.getMaxDelayToWait()); + trafficShapingHandler.setMaxWriteDelay(maxDelayToWaitInMillis); + } } - if (options.getMaxDelayToWait() != 0) { - long maxDelayToWaitInMillis = options.getMaxDelayToWaitTimeUnit().toMillis(options.getMaxDelayToWait()); - trafficShapingHandler.setMaxWriteDelay(maxDelayToWaitInMillis); + else { + log.info("Not updating traffic shaping options as they have not changed"); } } } @@ -292,8 +301,10 @@ private synchronized Future listen(SocketAddress localAddress, ContextI } else { // Server already exists with that host/port - we will use that actualServer = main; - metrics = main.metrics; - childHandler = childHandler(listenContext, localAddress, main.trafficShapingHandler); + metrics = actualServer.metrics; + // Ensure the workers inherit the actual server's traffic-shaping handler + trafficShapingHandler = actualServer.trafficShapingHandler; + childHandler = childHandler(listenContext, localAddress, actualServer.trafficShapingHandler); worker = ch -> childHandler.accept(ch, actualServer.sslChannelProvider.result().sslChannelProvider()); actualServer.servers.add(this); actualServer.channelBalancer.addWorker(eventLoop, worker); diff --git a/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java b/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java index f8f9fb21331..ba283152e77 100644 --- a/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java +++ b/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java @@ -13,9 +13,10 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -204,6 +205,54 @@ public void start(Promise startPromise) { Assert.assertTrue(elapsedMillis > expectedTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT)); // because there are simultaneous 2 requests } + @Test + public void testDynamicOutboundRateUpdateSharedServers() throws IOException, InterruptedException + { + int numEventLoops = 5; // We start a shared TCP server with 2 event-loops + List servers = new ArrayList<>(); + Future listenLatch = vertx.deployVerticle(() -> new AbstractVerticle() { + @Override + public void start(Promise startPromise) { + HttpServer testServer = serverFactory.apply(vertx); + servers.add(testServer); + testServer.requestHandler(HANDLERS.getFile(sampleF)) + .listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST).mapEmpty().onComplete(startPromise); + } + }, new DeploymentOptions().setInstances(numEventLoops)); + + HttpClient testClient = clientFactory.apply(vertx); + CountDownLatch waitForResponse = new CountDownLatch(2); + AtomicLong startTime = new AtomicLong(); + AtomicLong totalReceivedLength = new AtomicLong(); + long expectedLength = Files.size(Paths.get(sampleF.getAbsolutePath())); + listenLatch.onComplete(onSuccess(v -> { + startTime.set(System.nanoTime()); + for (int i = 0; i < 2; i++) { + testClient.request(HttpMethod.GET, DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/get-file") + .compose(req -> req.send() + .andThen(onSuccess(resp -> assertEquals(200, resp.statusCode()))) + .compose(HttpClientResponse::body)) + .onComplete(onSuccess(body -> { + long receivedBytes = body.getBytes().length; + totalReceivedLength.addAndGet(receivedBytes); + Assert.assertEquals(expectedLength, receivedBytes); + waitForResponse.countDown(); + })); + } + })); + awaitLatch(waitForResponse); + TrafficShapingOptions updatedTrafficOptions = new TrafficShapingOptions() + .setInboundGlobalBandwidth(INBOUND_LIMIT) // unchanged + .setOutboundGlobalBandwidth(2 * OUTBOUND_LIMIT); + + for (int i = 0; i < numEventLoops; i++) { + servers.forEach(s -> s.updateTrafficShapingOptions(updatedTrafficOptions)); + } + + long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get()); + Assert.assertTrue(elapsedMillis > expectedTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT)); + } + @Test public void testDynamicOutboundRateUpdate() throws Exception { Buffer expectedBuffer = TestUtils.randomBuffer(TEST_CONTENT_SIZE);