@@ -232,16 +232,17 @@ private Future<Void> cloudRefresh() {
232
232
233
233
Promise <Void > refreshPromise = Promise .promise ();
234
234
this .isRefreshing = true ;
235
- vertx .< Void > executeBlocking (blockBromise -> {
235
+ vertx .executeBlocking (() -> {
236
236
this .cloudRefreshEnsureInSync (refreshPromise , 0 );
237
- blockBromise . complete () ;
238
- }, ar -> {} );
237
+ return null ;
238
+ });
239
239
240
240
return refreshPromise .future ()
241
241
.onComplete (v -> {
242
242
this .isRefreshing = false ;
243
243
emitRefreshedEvent ();
244
244
});
245
+
245
246
}
246
247
247
248
// this is a blocking function
@@ -320,12 +321,13 @@ private void handleUpload(Message<String> msg) {
320
321
this .pendingUpload .add (fileToUpload );
321
322
}
322
323
323
- this .uploadExecutor .<Void >executeBlocking (
324
- promise -> this .cloudUploadBlocking (promise , msg .body ()),
325
- ar -> {
326
- this .pendingUpload .remove (fileToUpload );
327
- this .handleAsyncResult (ar );
328
- msg .reply (ar .succeeded ());
324
+ this .uploadExecutor .executeBlocking (() -> {
325
+ this .cloudUploadBlocking (msg .body ());
326
+ return null ;
327
+ }).onComplete (ar -> {
328
+ this .pendingUpload .remove (fileToUpload );
329
+ this .handleAsyncResult (ar );
330
+ msg .reply (ar .succeeded ());
329
331
330
332
// increase counter
331
333
if (ar .succeeded ()) {
@@ -338,16 +340,10 @@ private void handleUpload(Message<String> msg) {
338
340
});
339
341
}
340
342
341
- private void cloudUploadBlocking (Promise <Void > promise , String fileToUpload ) {
342
- try {
343
- String cloudPath = this .cloudSync .toCloudPath (fileToUpload );
344
- try (InputStream localInput = this .localStorage .download (fileToUpload )) {
345
- this .cloudStorage .upload (localInput , cloudPath );
346
- }
347
- promise .complete ();
348
- } catch (Exception ex ) {
349
- LOGGER .error (ex .getMessage (), ex );
350
- promise .fail (new Throwable (ex ));
343
+ private void cloudUploadBlocking (String fileToUpload ) throws Exception {
344
+ String cloudPath = this .cloudSync .toCloudPath (fileToUpload );
345
+ try (InputStream localInput = this .localStorage .download (fileToUpload )) {
346
+ this .cloudStorage .upload (localInput , cloudPath );
351
347
}
352
348
}
353
349
@@ -364,9 +360,10 @@ private Future<Void> cloudDownloadFile(String s3Path) {
364
360
}
365
361
366
362
Promise <Void > promise = Promise .promise ();
367
- this .downloadExecutor .<Void >executeBlocking (
368
- blockingPromise -> this .cloudDownloadBlocking (blockingPromise , s3Path ),
369
- ar -> {
363
+ this .downloadExecutor .executeBlocking (() -> {
364
+ this .cloudDownloadBlocking (s3Path );
365
+ return null ;
366
+ }).onComplete (ar -> {
370
367
this .pendingDownload .remove (s3Path );
371
368
this .handleAsyncResult (ar );
372
369
promise .complete ();
@@ -385,7 +382,7 @@ private Future<Void> cloudDownloadFile(String s3Path) {
385
382
return promise .future ();
386
383
}
387
384
388
- private void cloudDownloadBlocking (Promise < Void > promise , String s3Path ) {
385
+ private void cloudDownloadBlocking (String s3Path ) throws Exception {
389
386
final long cloudDownloadStart = System .nanoTime ();
390
387
try {
391
388
String localPath = this .cloudSync .toLocalPath (s3Path );
@@ -398,15 +395,14 @@ private void cloudDownloadBlocking(Promise<Void> promise, String s3Path) {
398
395
downloadSuccessTimer .record (java .time .Duration .ofMillis (cloudDownloadTimeMs ));
399
396
LOGGER .info ("S3 download completed: {} in {} ms" , cloudStorage .mask (s3Path ), cloudDownloadTimeMs );
400
397
}
401
- promise .complete ();
402
398
} catch (Exception ex ) {
403
399
final long cloudDownloadEnd = System .nanoTime ();
404
400
final long cloudDownloadTimeMs = (cloudDownloadEnd - cloudDownloadStart ) / 1_000_000 ;
405
401
406
402
downloadFailureTimer .record (java .time .Duration .ofMillis (cloudDownloadTimeMs ));
407
403
// Be careful as the s3Path may contain the pre-signed S3 token, so do not log the whole path
408
404
LOGGER .error ("download error: " + ex .getClass ().getSimpleName ());
409
- promise . fail ( new Throwable ( ex )) ;
405
+ throw ex ;
410
406
}
411
407
}
412
408
0 commit comments