From 80c7935910a58a6c867239b7db28c6ba289e353a Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 19 Dec 2024 12:52:37 -0500 Subject: [PATCH 1/2] Call executorService.submit() in MultiThreadedS3FileUploader --- .../uploader/MultiThreadedS3FileUploader.java | 76 +++++++++---------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java index 3589889..f399f90 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java @@ -63,47 +63,47 @@ public void uploadFile(DirectoryTreeWatcher.UploadTask uploadTask, S3UploadCallb String s3Key = String.format("%s/%s", s3Prefix, subpath); long queueTime = System.currentTimeMillis(); - PutObjectRequest putObjectRequest = PutObjectRequest.builder() - .bucket(s3Bucket) - .key(s3Key) - // Changing checksum algorithm does not seem to - // have any impact regarding seeing CPU intensive - // sun/security/provider/MD5.implCompress - // that is observed in the flame graph. - //.checksumAlgorithm(ChecksumAlgorithm.CRC32_C) - .build(); - CompletableFuture future; String uploadPathString = String.format("s3://%s/%s", s3Bucket, s3Key); - uploadTask.setUploadDestinationPathString(uploadPathString); // set the upload destination path so that it can be used in the callback - try { - LOG.info(String.format("Submitting upload of %s --> %s", uploadTask.getAbsolutePath(), uploadPathString)); - future = s3AsyncClient.putObject(putObjectRequest, uploadTask.getAbsolutePath()); - } catch (Exception e) { - long timeSpentMs = System.currentTimeMillis() - queueTime; - LOG.warn(String.format("Caught exception during putObject for %s --> %s in %dms", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), e); - int errorCode = UPLOAD_GENERAL_ERROR_CODE; - if (Utils.isAssignableFromRecursive(e, NoSuchFileException.class)) { - errorCode = UPLOAD_FILE_NOT_FOUND_ERROR_CODE; - } - s3UploadCallback.onCompletion(uploadTask, timeSpentMs, e, errorCode); - return; - } - future.whenComplete((putObjectResponse, throwable) -> { - long timeSpentMs = System.currentTimeMillis() - queueTime; - if (throwable != null) { - LOG.warn(String.format("PutObject failed for %s --> %s in %d ms.", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), throwable); + LOG.info(String.format("Submitting upload of %s --> %s", uploadTask.getAbsolutePath(), uploadPathString)); + executorService.submit(() -> { + try { + PutObjectRequest putObjectRequest = PutObjectRequest.builder() + .bucket(s3Bucket) + .key(s3Key) + // Changing checksum algorithm does not seem to + // have any impact regarding seeing CPU intensive + // sun/security/provider/MD5.implCompress + // that is observed in the flame graph. + //.checksumAlgorithm(ChecksumAlgorithm.CRC32_C) + .build(); + uploadTask.setUploadDestinationPathString(uploadPathString); // set the upload destination path so that it can be used in the callback + CompletableFuture future = s3AsyncClient.putObject(putObjectRequest, uploadTask.getAbsolutePath()); + future.whenComplete((putObjectResponse, throwable) -> { + long timeSpentMs = System.currentTimeMillis() - queueTime; + if (throwable != null) { + LOG.warn(String.format("PutObject failed for %s --> %s in %d ms.", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), throwable); - int errorCode = getErrorCode(throwable, putObjectResponse); + int errorCode = getErrorCode(throwable, putObjectResponse); - s3UploadCallback.onCompletion( - uploadTask, - timeSpentMs, - throwable, - errorCode - ); - } else { - LOG.info(String.format("Completed upload of %s in %d ms.", uploadPathString, timeSpentMs)); - s3UploadCallback.onCompletion(uploadTask, timeSpentMs,null, putObjectResponse.sdkHttpResponse().statusCode()); + s3UploadCallback.onCompletion( + uploadTask, + timeSpentMs, + throwable, + errorCode + ); + } else { + LOG.info(String.format("Completed upload of %s in %d ms.", uploadPathString, timeSpentMs)); + s3UploadCallback.onCompletion(uploadTask, timeSpentMs,null, putObjectResponse.sdkHttpResponse().statusCode()); + } + }); + } catch (Exception e) { + long timeSpentMs = System.currentTimeMillis() - queueTime; + LOG.warn(String.format("Caught exception during putObject for %s --> %s in %dms", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), e); + int errorCode = UPLOAD_GENERAL_ERROR_CODE; + if (Utils.isAssignableFromRecursive(e, NoSuchFileException.class)) { + errorCode = UPLOAD_FILE_NOT_FOUND_ERROR_CODE; + } + s3UploadCallback.onCompletion(uploadTask, timeSpentMs, e, errorCode); } }); } From fde44958d28c34e9630ef4edfcee4f33a84c78e6 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 19 Dec 2024 12:59:49 -0500 Subject: [PATCH 2/2] Fix order of calls --- .../uploader/MultiThreadedS3FileUploader.java | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java index f399f90..cd60f69 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java @@ -66,36 +66,19 @@ public void uploadFile(DirectoryTreeWatcher.UploadTask uploadTask, S3UploadCallb String uploadPathString = String.format("s3://%s/%s", s3Bucket, s3Key); LOG.info(String.format("Submitting upload of %s --> %s", uploadTask.getAbsolutePath(), uploadPathString)); executorService.submit(() -> { + CompletableFuture future; + PutObjectRequest putObjectRequest = PutObjectRequest.builder() + .bucket(s3Bucket) + .key(s3Key) + // Changing checksum algorithm does not seem to + // have any impact regarding seeing CPU intensive + // sun/security/provider/MD5.implCompress + // that is observed in the flame graph. + //.checksumAlgorithm(ChecksumAlgorithm.CRC32_C) + .build(); + uploadTask.setUploadDestinationPathString(uploadPathString); // set the upload destination path so that it can be used in the callback try { - PutObjectRequest putObjectRequest = PutObjectRequest.builder() - .bucket(s3Bucket) - .key(s3Key) - // Changing checksum algorithm does not seem to - // have any impact regarding seeing CPU intensive - // sun/security/provider/MD5.implCompress - // that is observed in the flame graph. - //.checksumAlgorithm(ChecksumAlgorithm.CRC32_C) - .build(); - uploadTask.setUploadDestinationPathString(uploadPathString); // set the upload destination path so that it can be used in the callback - CompletableFuture future = s3AsyncClient.putObject(putObjectRequest, uploadTask.getAbsolutePath()); - future.whenComplete((putObjectResponse, throwable) -> { - long timeSpentMs = System.currentTimeMillis() - queueTime; - if (throwable != null) { - LOG.warn(String.format("PutObject failed for %s --> %s in %d ms.", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), throwable); - - int errorCode = getErrorCode(throwable, putObjectResponse); - - s3UploadCallback.onCompletion( - uploadTask, - timeSpentMs, - throwable, - errorCode - ); - } else { - LOG.info(String.format("Completed upload of %s in %d ms.", uploadPathString, timeSpentMs)); - s3UploadCallback.onCompletion(uploadTask, timeSpentMs,null, putObjectResponse.sdkHttpResponse().statusCode()); - } - }); + future = s3AsyncClient.putObject(putObjectRequest, uploadTask.getAbsolutePath()); } catch (Exception e) { long timeSpentMs = System.currentTimeMillis() - queueTime; LOG.warn(String.format("Caught exception during putObject for %s --> %s in %dms", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), e); @@ -104,7 +87,26 @@ public void uploadFile(DirectoryTreeWatcher.UploadTask uploadTask, S3UploadCallb errorCode = UPLOAD_FILE_NOT_FOUND_ERROR_CODE; } s3UploadCallback.onCompletion(uploadTask, timeSpentMs, e, errorCode); + return; } + future.whenComplete((putObjectResponse, throwable) -> { + long timeSpentMs = System.currentTimeMillis() - queueTime; + if (throwable != null) { + LOG.warn(String.format("PutObject failed for %s --> %s in %d ms.", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), throwable); + + int errorCode = getErrorCode(throwable, putObjectResponse); + + s3UploadCallback.onCompletion( + uploadTask, + timeSpentMs, + throwable, + errorCode + ); + } else { + LOG.info(String.format("Completed upload of %s in %d ms.", uploadPathString, timeSpentMs)); + s3UploadCallback.onCompletion(uploadTask, timeSpentMs,null, putObjectResponse.sdkHttpResponse().statusCode()); + } + }); }); }