Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call executorService.submit() in MultiThreadedS3FileUploader #16

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,48 +63,50 @@ public void uploadFile(DirectoryTreeWatcher.UploadTask uploadTask, S3UploadCallb

String s3Key = String.format("%s/%s", s3Prefix, subpath);
long queueTime = System.currentTimeMillis();
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(s3Bucket)
.key(s3Key)
// Changing checksum algorithm does not seem to
// have any impact regarding seeing CPU intensive
// sun/security/provider/MD5.implCompress
// that is observed in the flame graph.
//.checksumAlgorithm(ChecksumAlgorithm.CRC32_C)
.build();
CompletableFuture<PutObjectResponse> future;
String uploadPathString = String.format("s3://%s/%s", s3Bucket, s3Key);
uploadTask.setUploadDestinationPathString(uploadPathString); // set the upload destination path so that it can be used in the callback
try {
LOG.info(String.format("Submitting upload of %s --> %s", uploadTask.getAbsolutePath(), uploadPathString));
future = s3AsyncClient.putObject(putObjectRequest, uploadTask.getAbsolutePath());
} catch (Exception e) {
long timeSpentMs = System.currentTimeMillis() - queueTime;
LOG.warn(String.format("Caught exception during putObject for %s --> %s in %dms", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), e);
int errorCode = UPLOAD_GENERAL_ERROR_CODE;
if (Utils.isAssignableFromRecursive(e, NoSuchFileException.class)) {
errorCode = UPLOAD_FILE_NOT_FOUND_ERROR_CODE;
LOG.info(String.format("Submitting upload of %s --> %s", uploadTask.getAbsolutePath(), uploadPathString));
executorService.submit(() -> {
CompletableFuture<PutObjectResponse> future;
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(s3Bucket)
.key(s3Key)
// Changing checksum algorithm does not seem to
// have any impact regarding seeing CPU intensive
// sun/security/provider/MD5.implCompress
// that is observed in the flame graph.
//.checksumAlgorithm(ChecksumAlgorithm.CRC32_C)
.build();
uploadTask.setUploadDestinationPathString(uploadPathString); // set the upload destination path so that it can be used in the callback
try {
future = s3AsyncClient.putObject(putObjectRequest, uploadTask.getAbsolutePath());
} catch (Exception e) {
long timeSpentMs = System.currentTimeMillis() - queueTime;
LOG.warn(String.format("Caught exception during putObject for %s --> %s in %dms", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), e);
int errorCode = UPLOAD_GENERAL_ERROR_CODE;
if (Utils.isAssignableFromRecursive(e, NoSuchFileException.class)) {
errorCode = UPLOAD_FILE_NOT_FOUND_ERROR_CODE;
}
s3UploadCallback.onCompletion(uploadTask, timeSpentMs, e, errorCode);
return;
}
s3UploadCallback.onCompletion(uploadTask, timeSpentMs, e, errorCode);
return;
}
future.whenComplete((putObjectResponse, throwable) -> {
long timeSpentMs = System.currentTimeMillis() - queueTime;
if (throwable != null) {
LOG.warn(String.format("PutObject failed for %s --> %s in %d ms.", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), throwable);
future.whenComplete((putObjectResponse, throwable) -> {
long timeSpentMs = System.currentTimeMillis() - queueTime;
if (throwable != null) {
LOG.warn(String.format("PutObject failed for %s --> %s in %d ms.", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), throwable);

int errorCode = getErrorCode(throwable, putObjectResponse);
int errorCode = getErrorCode(throwable, putObjectResponse);

s3UploadCallback.onCompletion(
uploadTask,
timeSpentMs,
throwable,
errorCode
);
} else {
LOG.info(String.format("Completed upload of %s in %d ms.", uploadPathString, timeSpentMs));
s3UploadCallback.onCompletion(uploadTask, timeSpentMs,null, putObjectResponse.sdkHttpResponse().statusCode());
}
s3UploadCallback.onCompletion(
uploadTask,
timeSpentMs,
throwable,
errorCode
);
} else {
LOG.info(String.format("Completed upload of %s in %d ms.", uploadPathString, timeSpentMs));
s3UploadCallback.onCompletion(uploadTask, timeSpentMs,null, putObjectResponse.sdkHttpResponse().statusCode());
}
});
});
}

Expand Down
Loading