Skip to content

Commit 1176393

Browse files
authored
Merge pull request #238 from rabbitmq/producer-message-count-option
Add option for producer message count
2 parents 01cece3 + d281c83 commit 1176393

File tree

3 files changed

+196
-6
lines changed

3 files changed

+196
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright (c) 2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Performance Testing Tool, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.perf;
16+
17+
import java.time.Duration;
18+
import java.util.Map;
19+
import java.util.concurrent.ConcurrentMap;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import org.slf4j.Logger;
24+
25+
interface CompletionHandler {
26+
27+
Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CompletionHandler.class);
28+
29+
void waitForCompletion() throws InterruptedException;
30+
31+
void countDown(String reason);
32+
33+
final class DefaultCompletionHandler implements CompletionHandler {
34+
35+
private static final String STOP_REASON_REACHED_TIME_LIMIT = "Reached time limit";
36+
37+
private final Duration timeLimit;
38+
private final CountDownLatch latch;
39+
private final ConcurrentMap<String, Integer> reasons;
40+
private final AtomicBoolean completed = new AtomicBoolean(false);
41+
42+
DefaultCompletionHandler(
43+
int timeLimitSeconds, int countLimit, ConcurrentMap<String, Integer> reasons) {
44+
this.timeLimit = Duration.ofSeconds(timeLimitSeconds);
45+
int count = countLimit <= 0 ? 1 : countLimit;
46+
LOGGER.debug("Count completion limit is {}", count);
47+
this.latch = new CountDownLatch(count);
48+
this.reasons = reasons;
49+
}
50+
51+
@Override
52+
public void waitForCompletion() throws InterruptedException {
53+
if (timeLimit.isNegative() || timeLimit.isZero()) {
54+
this.latch.await();
55+
completed.set(true);
56+
} else {
57+
boolean countedDown = this.latch.await(timeLimit.toMillis(), TimeUnit.MILLISECONDS);
58+
completed.set(true);
59+
if (LOGGER.isDebugEnabled()) {
60+
LOGGER.debug("Completed, counted down? {}", countedDown);
61+
}
62+
if (!countedDown) {
63+
recordReason(reasons, STOP_REASON_REACHED_TIME_LIMIT);
64+
}
65+
}
66+
}
67+
68+
@Override
69+
public void countDown(String reason) {
70+
if (LOGGER.isDebugEnabled()) {
71+
LOGGER.debug("Counting down ({})", reason);
72+
}
73+
if (!completed.get()) {
74+
recordReason(reasons, reason);
75+
latch.countDown();
76+
}
77+
}
78+
}
79+
80+
/**
81+
* This completion handler waits forever, but it can be counted down, typically when a producer or
82+
* a consumer fails. This avoids PerfTest hanging after a failure.
83+
*/
84+
final class NoLimitCompletionHandler implements CompletionHandler {
85+
86+
private final CountDownLatch latch = new CountDownLatch(1);
87+
private final ConcurrentMap<String, Integer> reasons;
88+
89+
NoLimitCompletionHandler(ConcurrentMap<String, Integer> reasons) {
90+
this.reasons = reasons;
91+
}
92+
93+
@Override
94+
public void waitForCompletion() throws InterruptedException {
95+
latch.await();
96+
}
97+
98+
@Override
99+
public void countDown(String reason) {
100+
if (LOGGER.isDebugEnabled()) {
101+
LOGGER.debug("Counting down ({})", reason);
102+
}
103+
recordReason(reasons, reason);
104+
latch.countDown();
105+
}
106+
}
107+
108+
private static void recordReason(Map<String, Integer> reasons, String reason) {
109+
reasons.compute(reason, (keyReason, count) -> count == null ? 1 : count + 1);
110+
}
111+
}

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+49-6
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@
6868
import java.util.Locale;
6969
import java.util.Map;
7070
import java.util.concurrent.Callable;
71+
import java.util.concurrent.ConcurrentHashMap;
72+
import java.util.concurrent.ConcurrentMap;
7173
import java.util.concurrent.CountDownLatch;
7274
import java.util.concurrent.ExecutorService;
7375
import java.util.concurrent.Executors;
@@ -668,6 +670,13 @@ void setTcpNoDelay(String input) throws Exception {
668670
defaultValue = "0")
669671
private Duration consumerLatency;
670672

673+
@CommandLine.Option(
674+
names = {"--pmessages", "-C"},
675+
description = "producer message count, default is 0 (no limit)",
676+
defaultValue = "0",
677+
converter = Converters.GreaterThanOrEqualToZeroIntegerTypeConverter.class)
678+
private long pmessages;
679+
671680
private MetricsCollector metricsCollector;
672681
private PerformanceMetrics performanceMetrics;
673682
private List<Monitoring> monitorings;
@@ -1046,6 +1055,16 @@ public Integer call() throws Exception {
10461055
}));
10471056
}
10481057

1058+
CompletionHandler completionHandler;
1059+
ConcurrentMap<String, Integer> completionReasons = new ConcurrentHashMap<>();
1060+
if (isRunTimeLimited() || this.pmessages > 0) {
1061+
completionHandler =
1062+
new CompletionHandler.DefaultCompletionHandler(
1063+
this.time, this.producers, completionReasons);
1064+
} else {
1065+
completionHandler = new CompletionHandler.NoLimitCompletionHandler(completionReasons);
1066+
}
1067+
10491068
List<Producer> producers = Collections.synchronizedList(new ArrayList<>(this.producers));
10501069
List<Runnable> producerRunnables =
10511070
IntStream.range(0, this.producers)
@@ -1135,11 +1154,38 @@ public Integer call() throws Exception {
11351154
} else {
11361155
latencyCallback = msg -> {};
11371156
}
1157+
1158+
if (this.confirmLatency) {
1159+
producerBuilder.confirmTimeout(Duration.ofSeconds(0));
1160+
}
1161+
1162+
Runnable messagePublishedCallback, messageConfirmedCallback;
1163+
if (this.pmessages > 0) {
1164+
AtomicLong messageCount = new AtomicLong(0);
1165+
messagePublishedCallback =
1166+
() -> {
1167+
if (messageCount.incrementAndGet() == this.pmessages) {
1168+
Thread.currentThread().interrupt();
1169+
}
1170+
};
1171+
AtomicLong messageConfirmedCount = new AtomicLong(0);
1172+
messageConfirmedCallback =
1173+
() -> {
1174+
if (messageConfirmedCount.incrementAndGet() == this.pmessages) {
1175+
completionHandler.countDown("Producer reached message limit");
1176+
}
1177+
};
1178+
} else {
1179+
messagePublishedCallback = () -> {};
1180+
messageConfirmedCallback = () -> {};
1181+
}
1182+
11381183
ConfirmationHandler confirmationHandler =
11391184
confirmationStatus -> {
11401185
if (confirmationStatus.isConfirmed()) {
11411186
producerConfirm.increment();
11421187
latencyCallback.accept(confirmationStatus.getMessage());
1188+
messageConfirmedCallback.run();
11431189
}
11441190
};
11451191

@@ -1164,6 +1210,7 @@ public Integer call() throws Exception {
11641210
messageBuilderConsumer.accept(messageBuilder);
11651211
producer.send(
11661212
messageBuilder.addData(payload).build(), confirmationHandler);
1213+
messagePublishedCallback.run();
11671214
}
11681215
} catch (Exception e) {
11691216
if (e.getCause() != null
@@ -1312,12 +1359,8 @@ public Integer call() throws Exception {
13121359
Thread shutdownHook = new Thread(latch::countDown);
13131360
Runtime.getRuntime().addShutdownHook(shutdownHook);
13141361
try {
1315-
if (isRunTimeLimited()) {
1316-
boolean completed = latch.await(this.time, TimeUnit.SECONDS);
1317-
LOGGER.debug("Completion latch reached 0: {}", completed);
1318-
} else {
1319-
latch.await();
1320-
}
1362+
completionHandler.waitForCompletion();
1363+
LOGGER.debug("Completion with reason(s): {}", completionReasons);
13211364
Runtime.getRuntime().removeShutdownHook(shutdownHook);
13221365
} catch (InterruptedException e) {
13231366
// moving on to the closing sequence

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

+36
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.rabbitmq.stream.AddressResolver;
2222
import com.rabbitmq.stream.ByteCapacity;
2323
import com.rabbitmq.stream.Constants;
24+
import com.rabbitmq.stream.Environment;
25+
import com.rabbitmq.stream.OffsetSpecification;
2426
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
2527
import com.rabbitmq.stream.compression.Compression;
2628
import com.rabbitmq.stream.impl.Client;
@@ -51,6 +53,7 @@
5153
import java.util.concurrent.Executors;
5254
import java.util.concurrent.Future;
5355
import java.util.concurrent.atomic.AtomicInteger;
56+
import java.util.concurrent.atomic.AtomicLong;
5457
import java.util.concurrent.atomic.AtomicReference;
5558
import java.util.function.Consumer;
5659
import java.util.stream.Collectors;
@@ -65,6 +68,8 @@
6568
import org.junit.jupiter.api.condition.EnabledOnOs;
6669
import org.junit.jupiter.api.condition.OS;
6770
import org.junit.jupiter.api.extension.ExtendWith;
71+
import org.junit.jupiter.params.ParameterizedTest;
72+
import org.junit.jupiter.params.provider.CsvSource;
6873

6974
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
7075
public class StreamPerfTestTest {
@@ -527,6 +532,32 @@ void shouldNotFailWhenFilteringIsActivated() throws Exception {
527532
assertThat(consoleOutput()).containsIgnoringCase("summary:");
528533
}
529534

535+
@ParameterizedTest
536+
@CsvSource({
537+
"200, 1", "200, 2",
538+
})
539+
void shouldPublishExpectedNumberOfMessagesIfOptionIsSet(long pmessages, int producerCount)
540+
throws Exception {
541+
long expectedMessageCount = pmessages * producerCount;
542+
run(builder().pmessages(pmessages).producers(producerCount));
543+
waitUntilStreamExists(s);
544+
waitRunEnds();
545+
AtomicLong receivedCount = new AtomicLong();
546+
AtomicLong lastCommittedChunkId = new AtomicLong();
547+
try (Environment env = Environment.builder().build()) {
548+
env.consumerBuilder().stream(s)
549+
.offset(OffsetSpecification.first())
550+
.messageHandler(
551+
(ctx, msg) -> {
552+
lastCommittedChunkId.set(ctx.committedChunkId());
553+
receivedCount.incrementAndGet();
554+
})
555+
.build();
556+
waitAtMost(() -> receivedCount.get() == expectedMessageCount);
557+
waitAtMost(() -> lastCommittedChunkId.get() == env.queryStreamStats(s).committedChunkId());
558+
}
559+
}
560+
530561
private static <T> Consumer<T> wrap(CallableConsumer<T> action) {
531562
return t -> {
532563
try {
@@ -759,6 +790,11 @@ ArgumentsBuilder filterValues(String... values) {
759790
return this;
760791
}
761792

793+
ArgumentsBuilder pmessages(long messages) {
794+
arguments.put("pmessages", String.valueOf(messages));
795+
return this;
796+
}
797+
762798
String build() {
763799
return this.arguments.entrySet().stream()
764800
.map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue())))

0 commit comments

Comments
 (0)