Skip to content

Commit a3d61b2

Browse files
webermasterKevin Weber
andauthored
Added CDL so main thread waits for subscribed messages to complete be… (#114)
* Added CDL so main thread waits for subscribed messages to complete before closing connection Co-authored-by: Kevin Weber <[email protected]>
1 parent 71fab93 commit a3d61b2

File tree

2 files changed

+18
-12
lines changed

2 files changed

+18
-12
lines changed

samples/BasicPubSub/src/main/java/pubsub/PubSub.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import software.amazon.awssdk.iot.iotjobs.model.RejectedError;
2323

2424
import java.io.UnsupportedEncodingException;
25+
import java.nio.charset.StandardCharsets;
2526
import java.util.UUID;
2627
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.CountDownLatch;
2729
import java.util.concurrent.ExecutionException;
2830

2931
class PubSub {
@@ -291,13 +293,12 @@ public void onConnectionResumed(boolean sessionPresent) {
291293
throw new RuntimeException("Exception occurred during connect", ex);
292294
}
293295

296+
CountDownLatch countDownLatch = new CountDownLatch(messagesToPublish);
297+
294298
CompletableFuture<Integer> subscribed = connection.subscribe(topic, QualityOfService.AT_LEAST_ONCE, (message) -> {
295-
try {
296-
String payload = new String(message.getPayload(), "UTF-8");
297-
System.out.println("MESSAGE: " + payload);
298-
} catch (UnsupportedEncodingException ex) {
299-
System.out.println("Unable to decode payload: " + ex.getMessage());
300-
}
299+
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
300+
System.out.println("MESSAGE: " + payload);
301+
countDownLatch.countDown();
301302
});
302303

303304
subscribed.get();
@@ -308,6 +309,8 @@ public void onConnectionResumed(boolean sessionPresent) {
308309
published.get();
309310
Thread.sleep(1000);
310311
}
312+
313+
countDownLatch.await();
311314

312315
CompletableFuture<Void> disconnected = connection.disconnect();
313316
disconnected.get();

samples/RawPubSub/src/main/java/rawpubsub/RawPubSub.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
import software.amazon.awssdk.iot.iotjobs.model.RejectedError;
1717

1818
import java.io.UnsupportedEncodingException;
19+
import java.nio.charset.StandardCharsets;
1920
import java.util.Arrays;
2021
import java.util.List;
2122
import java.util.UUID;
2223
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.CountDownLatch;
2325
import java.util.concurrent.ExecutionException;
2426

2527
class RawPubSub {
@@ -216,13 +218,12 @@ public void onConnectionResumed(boolean sessionPresent) {
216218
throw new RuntimeException("Exception occurred during connect", ex);
217219
}
218220

221+
CountDownLatch countDownLatch = new CountDownLatch(messagesToPublish);
222+
219223
CompletableFuture<Integer> subscribed = connection.subscribe(topic, QualityOfService.AT_LEAST_ONCE, (message) -> {
220-
try {
221-
String payload = new String(message.getPayload(), "UTF-8");
222-
System.out.println("MESSAGE: " + payload);
223-
} catch (UnsupportedEncodingException ex) {
224-
System.out.println("Unable to decode payload: " + ex.getMessage());
225-
}
224+
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
225+
System.out.println("MESSAGE: " + payload);
226+
countDownLatch.countDown();
226227
});
227228

228229
subscribed.get();
@@ -233,6 +234,8 @@ public void onConnectionResumed(boolean sessionPresent) {
233234
published.get();
234235
Thread.sleep(1000);
235236
}
237+
238+
countDownLatch.await();
236239

237240
CompletableFuture<Void> disconnected = connection.disconnect();
238241
disconnected.get();

0 commit comments

Comments
 (0)