1919import org .slf4j .Logger ;
2020import org .slf4j .LoggerFactory ;
2121import tech .ydb .core .grpc .GrpcTransport ;
22- import tech .ydb .examples .SimpleExample ;
2322import tech .ydb .topic .TopicClient ;
2423import tech .ydb .topic .description .Codec ;
2524import tech .ydb .topic .read .AsyncReader ;
4039/**
4140 * @author Nikolay Perfilov
4241 */
43- public class ReadWriteWorkload extends SimpleExample {
42+ public class ReadWriteWorkload extends SimpleTopicExample {
4443 private static final Logger logger = LoggerFactory .getLogger (ReadWriteWorkload .class );
4544 private static final int WRITE_TIMEOUT_SECONDS = 60 ;
4645 private static final int MESSAGE_LENGTH_BYTES = 10_000_000 ; // 10 Mb
@@ -53,12 +52,11 @@ public class ReadWriteWorkload extends SimpleExample {
5352 private final AtomicInteger messagesReceived = new AtomicInteger (0 );
5453 private final AtomicInteger messagesCommitted = new AtomicInteger (0 );
5554 private final AtomicLong bytesWritten = new AtomicLong (0 );
56- private long lastSeqNo = -1 ;
5755 CountDownLatch writeFinishedLatch = new CountDownLatch (1 );
5856 CountDownLatch readFinishedLatch = new CountDownLatch (1 );
5957
6058 @ Override
61- protected void run (GrpcTransport transport , String pathPrefix ) {
59+ protected void run (GrpcTransport transport ) {
6260
6361 ExecutorService compressionExecutor = Executors .newFixedThreadPool (10 );
6462 AtomicBoolean timeToStopWriting = new AtomicBoolean (false );
@@ -154,9 +152,8 @@ protected void run(GrpcTransport transport, String pathPrefix) {
154152 };
155153
156154 Runnable readingThread = () -> {
157- String consumerName = "consumer1" ;
158155 ReaderSettings readerSettings = ReaderSettings .newBuilder ()
159- .setConsumerName (consumerName )
156+ .setConsumerName (CONSUMER_NAME )
160157 .addTopic (TopicReadSettings .newBuilder ()
161158 .setPath (TOPIC_NAME )
162159 .setReadFrom (Instant .now ().minus (Duration .ofHours (24 )))
@@ -274,12 +271,6 @@ public void onMessages(DataReceivedEvent event) {
274271 } else {
275272 logger .debug ("Message received. SeqNo={}, offset={}" , message .getSeqNo (), message .getOffset ());
276273 }
277- if (lastSeqNo > message .getSeqNo ()) {
278- logger .error ("Received a message with seqNo {}. Previously got a message with seqNo {}" ,
279- message .getSeqNo (), lastSeqNo );
280- } else {
281- lastSeqNo = message .getSeqNo ();
282- }
283274 message .commit ().thenRun (() -> {
284275 logger .trace ("Message committed" );
285276 unreadMessagesCount .decrementAndGet ();
0 commit comments