Skip to content

Commit

Permalink
remove unused imports
Browse files Browse the repository at this point in the history
  • Loading branch information
jfzunigac committed Dec 4, 2024
1 parent ff91041 commit 05999bf
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.pinterest.singer.writer.s3;

import com.pinterest.singer.thrift.configuration.S3WriterConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -10,76 +12,73 @@

import java.io.File;

public class ObjectUploaderTask {
private static final Logger LOG = LoggerFactory.getLogger(S3Writer.class);
private final S3Client s3Client;
private final String bucket;
private final ObjectCannedACL cannedAcl;
public class PutObjectUploader extends S3Uploader {
private static final Logger LOG = LoggerFactory.getLogger(PutObjectUploader.class);
private final int maxRetries;
private static final long INITIAL_BACKOFF = 1000; // Initial backoff in milliseconds
private static final long MAX_BACKOFF = 32000; // Maximum backoff in milliseconds

public ObjectUploaderTask(S3Client s3Client, String bucket, ObjectCannedACL cannedAcl,
int maxRetries) {
this.s3Client = s3Client;
this.bucket = bucket;
this.cannedAcl = cannedAcl;
this.maxRetries = maxRetries;
public PutObjectUploader(S3WriterConfig s3WriterConfig, S3Client s3Client) {
super(s3WriterConfig, s3Client);
this.maxRetries = s3WriterConfig.getMaxRetries();
}

/**
* Uploads a file to S3 using the PutObject API.
* Uses exponential backoff with a cap for retries.
*
* @param file is the actual file in disk to be uploaded
* @param fileFormat is the key suffix
* @param s3ObjectUpload the object to upload
* @return true if the file was successfully uploaded, false otherwise
*/
public boolean upload(File file, String fileFormat) {
@Override
public boolean upload(S3ObjectUpload s3ObjectUpload) {
int attempts = 0;
boolean success = false;
long backoff = INITIAL_BACKOFF;
String s3Key = s3ObjectUpload.getKey();

while (attempts < maxRetries && !success) {
attempts++;
try {
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(bucket)
.key(fileFormat)
.key(s3Key)
.build();

if (cannedAcl != null) {
putObjectRequest = putObjectRequest.toBuilder().acl(cannedAcl).build();
}

PutObjectResponse putObjectResponse = s3Client.putObject(putObjectRequest, file.toPath());
PutObjectResponse
putObjectResponse =
s3Client.putObject(putObjectRequest, s3ObjectUpload.getFile().toPath());

if (putObjectResponse.sdkHttpResponse().isSuccessful()) {
LOG.info("Successfully uploaded file: {} on attempt {}", fileFormat, attempts);
LOG.info("Successfully uploaded file: {} on attempt {}", s3Key, attempts);
success = true;
} else {
LOG.error("Failed to upload file: {} on attempt {}", fileFormat, attempts);
LOG.error("Failed to upload file: {} on attempt {}", s3Key, attempts);
}
} catch (Exception e) {
LOG.error("Failed to upload file: {} on attempt {}. Error: {}", fileFormat, attempts, e.getMessage());
LOG.error("Failed to upload file: {} on attempt {}. Error: {}", s3Key, attempts, e.getMessage());
}

if (!success && attempts < maxRetries) {
try {
LOG.info("Failed to upload file: {} on attempt {}. Retrying in {} ms...", fileFormat, attempts, backoff);
LOG.info("Failed to upload file: {} on attempt {}. Retrying in {} ms...", s3Key, attempts, backoff);
Thread.sleep(backoff);
backoff = Math.min(backoff * 2, MAX_BACKOFF); // Exponential backoff with a cap
} catch (InterruptedException ie) {
LOG.error("Interrupted while waiting to retry file upload: {}", fileFormat, ie);
LOG.error("Interrupted while waiting to retry file upload: {}", s3Key, ie);
}
}
}

if (!success) {
// TODO: this means data loss as Singer gives up uploading the file, which is not ideal. We need a fallback mechanism.
LOG.error("Exhausted all attempts ({}) to upload file: {}", maxRetries, fileFormat);
LOG.error("Exhausted all attempts ({}) to upload file: {}", maxRetries, s3Key);
return false;
}
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.s3.S3Client;

import java.io.File;
import java.io.IOException;
Expand All @@ -28,7 +27,6 @@

import static org.junit.Assert.assertNotEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -79,7 +77,6 @@ public void tearDown() throws IOException {
SingerUtils.setHostname(SingerUtils.getHostname(), "-");
}


@Test
public void testSanitizeFileName() {
String fullPathPrefix = "/var/logs/app";
Expand Down

0 comments on commit 05999bf

Please sign in to comment.