From ff910413a772459ea6d3855bafea7c044ca6f830 Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Tue, 3 Dec 2024 16:41:10 -0800 Subject: [PATCH 1/2] introduce s3 client manager and s3 uploaders --- singer-commons/src/main/thrift/config.thrift | 4 + .../singer/common/SingerConfigDef.java | 3 + .../singer/utils/LogConfigUtils.java | 20 +++++ .../singer/writer/s3/S3ClientManager.java | 80 +++++++++++++++++++ .../singer/writer/s3/S3ObjectUpload.java | 25 ++++++ .../singer/writer/s3/S3Uploader.java | 31 +++++++ .../pinterest/singer/writer/s3/S3Writer.java | 33 +++----- .../singer/writer/s3/S3ClientManagerTest.java | 23 ++++++ .../singer/writer/{ => s3}/S3WriterTest.java | 28 +++---- 9 files changed, 210 insertions(+), 37 deletions(-) create mode 100644 singer/src/main/java/com/pinterest/singer/writer/s3/S3ClientManager.java create mode 100644 singer/src/main/java/com/pinterest/singer/writer/s3/S3ObjectUpload.java create mode 100644 singer/src/main/java/com/pinterest/singer/writer/s3/S3Uploader.java create mode 100644 singer/src/test/java/com/pinterest/singer/writer/s3/S3ClientManagerTest.java rename singer/src/test/java/com/pinterest/singer/writer/{ => s3}/S3WriterTest.java (90%) diff --git a/singer-commons/src/main/thrift/config.thrift b/singer-commons/src/main/thrift/config.thrift index f24ed750..247d0098 100644 --- a/singer-commons/src/main/thrift/config.thrift +++ b/singer-commons/src/main/thrift/config.thrift @@ -173,6 +173,10 @@ struct S3WriterConfig { // The S3 canned access control lists (ACLs) that can be applied to uploaded objects to determine their access permissions. // We don't set a default since some buckets don't allow setting canned ACLs. 9: optional string cannedAcl; + // The uploader class to use for uploading objects to S3. + 10: optional string uploaderClass = "com.pinterest.singer.writer.s3.PutObjectUploader"; + // Region of the S3 bucket. + 11: optional string region = "us-east-1"; } enum RealpinObjectType { diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java index ae879f56..2532c997 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java @@ -109,6 +109,7 @@ public class SingerConfigDef { public static final String RBM_ENCODING = "encoding"; public static final String RBM_APPEND_NEW_LINE = "appendNewLine"; + // S3 writer config public static final String BUCKET = "bucket"; public static final String KEY_FORMAT = "keyFormat"; public static final String MAX_FILE_SIZE_MB = "maxFileSizeMB"; @@ -117,5 +118,7 @@ public class SingerConfigDef { public static final String MAX_RETRIES = "maxRetries"; public static final String BUFFER_DIR = "bufferDir"; public static final String CANNED_ACL = "cannedAcl"; + public static final String UPLOADER_CLASS = "uploaderClass"; + public static final String REGION = "region"; public static final String NAMED_GROUP_PATTERN = "\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>"; } \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java index 00d17285..13bf8fc6 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java @@ -56,6 +56,7 @@ import com.pinterest.singer.thrift.configuration.TransformType; import com.pinterest.singer.thrift.configuration.WriterType; +import com.amazonaws.regions.Regions; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -849,6 +850,25 @@ private static S3WriterConfig parseS3WriterConfig(AbstractConfiguration writerCo if (writerConfiguration.containsKey(SingerConfigDef.BUFFER_DIR)) { config.setBufferDir(writerConfiguration.getString(SingerConfigDef.BUFFER_DIR)); } + + if (writerConfiguration.containsKey(SingerConfigDef.UPLOADER_CLASS)) { + // check if class is valid + try { + Class.forName(writerConfiguration.getString(SingerConfigDef.UPLOADER_CLASS)); + } catch (ClassNotFoundException e) { + throw new ConfigurationException("Invalid uploader class: " + writerConfiguration.getString(SingerConfigDef.UPLOADER_CLASS), e); + } + config.setUploaderClass(writerConfiguration.getString(SingerConfigDef.UPLOADER_CLASS)); + } + + if (writerConfiguration.containsKey(SingerConfigDef.REGION)) { + try { + Regions.fromName(writerConfiguration.getString(SingerConfigDef.REGION)); + } catch (IllegalArgumentException e) { + throw new ConfigurationException("Invalid region provided: " + writerConfiguration.getString(SingerConfigDef.REGION), e); + } + config.setRegion(writerConfiguration.getString(SingerConfigDef.REGION)); + } return config; } diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/S3ClientManager.java b/singer/src/main/java/com/pinterest/singer/writer/s3/S3ClientManager.java new file mode 100644 index 00000000..ab939b25 --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/S3ClientManager.java @@ -0,0 +1,80 @@ +package com.pinterest.singer.writer.s3; + +import com.pinterest.singer.common.SingerMetrics; +import com.pinterest.singer.metrics.OpenTsdbMetricConverter; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Singleton class that holds a region -> S3Client mapping for S3 client + * reuse and multi-region support. + */ +public class S3ClientManager { + + private static final Logger LOG = LoggerFactory.getLogger(S3ClientManager.class); + private static S3ClientManager s3UploaderManagerInstance = null; + + static { + S3ClientManager.getInstance(); + } + + public static S3ClientManager getInstance() { + if (s3UploaderManagerInstance == null) { + synchronized (S3ClientManager.class) { + if (s3UploaderManagerInstance == null) { + s3UploaderManagerInstance = new S3ClientManager(); + } + } + } + return s3UploaderManagerInstance; + } + + private ConcurrentHashMap s3ClientMap; + + private S3ClientManager() { + s3ClientMap = new ConcurrentHashMap<>(); + } + + public static void shutdown() { + S3ClientManager.getInstance().closeS3Clients(); + } + + public S3Client get(String region) { + // For we only use the region as the key, in the future we can construct the key with more + // fields (e.g endpoint + region). + String key = region; + // We don't check if the client is closed here because S3 clients are meant to be + // long-lived objects, and they are not closed by the writers. + if (s3ClientMap.containsKey(key)) { + return s3ClientMap.get(key); + } + S3Client s3Client = S3Client.builder() + .region(Region.of(region)) + .build(); + s3ClientMap.put(key, s3Client); + OpenTsdbMetricConverter.addMetric(SingerMetrics.S3_WRITER + "num_clients", s3ClientMap.size()); + return s3Client; + } + + private void closeS3Clients() { + s3ClientMap.forEach((key, client) -> { + try { + client.close(); + } catch (Exception e) { + LOG.error("Failed to close S3Client: {} ", key, e); + } + }); + } + + @VisibleForTesting + public Map getS3ClientMap() { + return s3ClientMap; + } +} diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/S3ObjectUpload.java b/singer/src/main/java/com/pinterest/singer/writer/s3/S3ObjectUpload.java new file mode 100644 index 00000000..3c47c653 --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/S3ObjectUpload.java @@ -0,0 +1,25 @@ +package com.pinterest.singer.writer.s3; + +import java.io.File; + +/** + * Represents an S3 object upload. + */ +public class S3ObjectUpload { + + private final String key; + private final File file; + + public S3ObjectUpload(String key, File file) { + this.key = key; + this.file = file; + } + + public String getKey() { + return key; + } + + public File getFile() { + return file; + } +} diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Uploader.java b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Uploader.java new file mode 100644 index 00000000..0fff5629 --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Uploader.java @@ -0,0 +1,31 @@ +package com.pinterest.singer.writer.s3; + +import com.pinterest.singer.thrift.configuration.S3WriterConfig; + +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; + +/** + * Abstract class for uploading S3 objects. + */ +public abstract class S3Uploader { + protected S3WriterConfig s3WriterConfig; + protected final ObjectCannedACL cannedAcl; + protected final String bucket; + protected S3Client s3Client; + + public S3Uploader(S3WriterConfig s3WriterConfig, S3Client s3Client) { + this.s3WriterConfig = s3WriterConfig; + this.bucket = s3WriterConfig.getBucket(); + this.cannedAcl = ObjectCannedACL.fromValue(s3WriterConfig.getCannedAcl()); + this.s3Client = s3Client; + } + + /** + * Uploads the given S3ObjectUpload to S3. + * + * @param s3ObjectUpload The S3ObjectUpload to upload. + * @return true if the upload was successful, false otherwise. + */ + public abstract boolean upload(S3ObjectUpload s3ObjectUpload); +} diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java index 1673f7ec..d8e6aaf9 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java @@ -54,8 +54,7 @@ public class S3Writer implements LogStreamWriter { private final String logName; private final String BUFFER_DIR; private static final int BYTES_IN_MB = 1024 * 1024; - private ObjectUploaderTask putObjectUploader; - private S3Client s3Client; + private S3Uploader s3Uploader; private final S3WriterConfig s3WriterConfig; // S3 information @@ -111,20 +110,18 @@ public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig) { // Static factory method for testing @VisibleForTesting - public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig, S3Client s3Client, - ObjectUploaderTask putObjectUploader, String path) { + public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig, S3Uploader s3Uploader, + String path) { Preconditions.checkNotNull(logStream); Preconditions.checkNotNull(s3WriterConfig); - Preconditions.checkNotNull(s3Client); - Preconditions.checkNotNull(putObjectUploader); + Preconditions.checkNotNull(s3Uploader); Preconditions.checkNotNull(!Strings.isNullOrEmpty(path)); this.BUFFER_DIR = path; this.logStream = logStream; this.logName = logStream.getSingerLog().getSingerLogConfig().getName(); this.s3WriterConfig = s3WriterConfig; - this.putObjectUploader = putObjectUploader; - this.s3Client = s3Client; + this.s3Uploader = s3Uploader; initialize(); } @@ -157,13 +154,12 @@ private void initialize() { } try { - if (s3Client == null) { - s3Client = S3Client.builder().build(); - } - if (putObjectUploader == null) { - putObjectUploader = - new ObjectUploaderTask(s3Client, bucketName, - ObjectCannedACL.fromValue(s3WriterConfig.getCannedAcl()), maxRetries); + if (s3Uploader == null) { + Class clazz = Class.forName(s3WriterConfig.getUploaderClass()); + s3Uploader = + (S3Uploader) clazz.getConstructor(S3WriterConfig.class, S3Client.class) + .newInstance(s3WriterConfig, S3ClientManager.getInstance() + .get(s3WriterConfig.getRegion())); } } catch (Exception e) { throw new RuntimeException(e); @@ -259,7 +255,7 @@ private void uploadDiskBufferedFileToS3() throws IOException { try { Files.move(bufferFile.toPath(), fileToUpload.toPath()); resetBufferFile(); - if (this.putObjectUploader.upload(fileToUpload, fileFormat)) { + if (this.s3Uploader.upload(new S3ObjectUpload(fileFormat, fileToUpload))) { OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "num_uploads", 1, "bucket=" + bucketName, "host=" + HOSTNAME, "logName=" + logName); @@ -527,10 +523,7 @@ public void close() throws IOException { } catch (IOException e) { LOG.error("Failed to close buffer writers", e); } - - if (s3Client != null) { - s3Client.close(); - } + // Refrain from closing the S3 client because it can be shared by other writers } } } \ No newline at end of file diff --git a/singer/src/test/java/com/pinterest/singer/writer/s3/S3ClientManagerTest.java b/singer/src/test/java/com/pinterest/singer/writer/s3/S3ClientManagerTest.java new file mode 100644 index 00000000..73816a50 --- /dev/null +++ b/singer/src/test/java/com/pinterest/singer/writer/s3/S3ClientManagerTest.java @@ -0,0 +1,23 @@ +package com.pinterest.singer.writer.s3; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.junit.Test; +import software.amazon.awssdk.services.s3.S3Client; + +public class S3ClientManagerTest { + + @Test + public void testCreateMultipleS3Clients() { + S3ClientManager s3ClientManager = S3ClientManager.getInstance(); + S3Client s3Client1 = s3ClientManager.get("us-east-1"); + S3Client s3Client2 = s3ClientManager.get("us-west-2"); + S3Client s3Client3 = s3ClientManager.get("us-east-1"); + S3Client s3Client4 = s3ClientManager.get("us-west-2"); + assertEquals(s3Client1, s3Client3); + assertEquals(s3Client2, s3Client4); + assertNotEquals(s3Client1, s3Client2); + assertEquals(2, s3ClientManager.getS3ClientMap().size()); + } +} diff --git a/singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java b/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java similarity index 90% rename from singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java rename to singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java index 05aec40a..7210e5a0 100644 --- a/singer/src/test/java/com/pinterest/singer/writer/S3WriterTest.java +++ b/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java @@ -1,4 +1,4 @@ -package com.pinterest.singer.writer; +package com.pinterest.singer.writer.s3; import com.pinterest.singer.SingerTestBase; import com.pinterest.singer.common.LogStream; @@ -8,8 +8,6 @@ import com.pinterest.singer.thrift.configuration.S3WriterConfig; import com.pinterest.singer.thrift.configuration.SingerLogConfig; import com.pinterest.singer.utils.SingerUtils; -import com.pinterest.singer.writer.s3.ObjectUploaderTask; -import com.pinterest.singer.writer.s3.S3Writer; import com.pinterest.singer.writer.s3.S3Writer.DefaultTokens; import org.apache.commons.io.FileUtils; @@ -38,12 +36,8 @@ @RunWith(MockitoJUnitRunner.class) public class S3WriterTest extends SingerTestBase { - - @Mock - private S3Client mockS3Client; - @Mock - private ObjectUploaderTask mockObjectUploaderTask; + private S3Uploader mockS3Uploader; private S3Writer s3Writer; private SingerLog singerLog; @@ -72,7 +66,7 @@ public void setUp() { // Initialize the S3Writer with mock dependencies s3Writer = - new S3Writer(logStream, s3WriterConfig, mockS3Client, mockObjectUploaderTask, tempPath); + new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); } @After @@ -183,7 +177,7 @@ public void testUploadToS3WhenSizeThresholdMet() throws Exception { LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); // Mock upload behavior - when(mockObjectUploaderTask.upload(any(File.class), anyString())).thenReturn(true); + when(mockS3Uploader.upload(any(S3ObjectUpload.class))).thenReturn(true); // Write log messages to commit s3Writer.startCommit(false); @@ -193,7 +187,7 @@ public void testUploadToS3WhenSizeThresholdMet() throws Exception { s3Writer.endCommit(2, false); // Verify upload was called - verify(mockObjectUploaderTask, atLeastOnce()).upload(any(File.class), anyString()); + verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class)); } @Test @@ -204,7 +198,7 @@ public void testUploadIsScheduled() throws Exception { LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); // Mock upload behavior - when(mockObjectUploaderTask.upload(any(File.class), anyString())).thenReturn(true); + when(mockS3Uploader.upload(any(S3ObjectUpload.class))).thenReturn(true); // Write log messages to commit s3Writer.startCommit(false); @@ -216,7 +210,7 @@ public void testUploadIsScheduled() throws Exception { s3Writer.endCommit(1, false); // Verify upload was called - verify(mockObjectUploaderTask, atLeastOnce()).upload(any(File.class), anyString()); + verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class)); } @Test @@ -233,7 +227,7 @@ public void testObjectKeyGeneration() { s3WriterConfig.setFilenamePattern("(?[^-]+)-(?[^.]+)\\.(?\\d+)"); s3WriterConfig.setFilenameTokens(Arrays.asList("namespace", "filename", "index")); s3Writer = - new S3Writer(logStream, s3WriterConfig, mockS3Client, mockObjectUploaderTask, tempPath); + new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); // Check key prefix String[] objectKeyParts = s3Writer.generateS3ObjectKey().split("/"); assertEquals(4, objectKeyParts.length); @@ -249,7 +243,7 @@ public void testObjectKeyGeneration() { // Custom tokens provided but filename pattern does not match s3WriterConfig.setFilenamePattern("(?[^.]+)\\.(?\\d+).0"); s3Writer = - new S3Writer(logStream, s3WriterConfig, mockS3Client, mockObjectUploaderTask, tempPath); + new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); objectKeyParts = s3Writer.generateS3ObjectKey().split("/"); assertEquals("%{namespace}", objectKeyParts[1]); keySuffixParts = objectKeyParts[3].split("\\."); @@ -262,7 +256,7 @@ public void testObjectKeyGeneration() { + "}}/%%{filename}/%{index}%/{{S}}}"; s3WriterConfig.setKeyFormat(keyFormat); s3WriterConfig.setFilenamePattern("(?[^-]+)-(?[^.]+)\\.(?\\d+)"); - s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Client, mockObjectUploaderTask, tempPath); + s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); objectKeyParts = s3Writer.generateS3ObjectKey().split("/"); assertEquals(6, objectKeyParts.length); assertEquals("%{{namespace}}", objectKeyParts[1]); @@ -294,6 +288,6 @@ public void testClose() throws Exception { File bufferFile = new File(FilenameUtils.concat(tempPath, bufferFileName)); assertTrue(!bufferFile.exists()); assertEquals(0, bufferFile.length()); - verify(mockObjectUploaderTask, atLeastOnce()).upload(any(File.class), anyString()); + verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class)); } } From 05999bf73fed55c0307708570d1a5f96bdc81dbd Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Tue, 3 Dec 2024 16:44:12 -0800 Subject: [PATCH 2/2] remove unused imports --- ...loaderTask.java => PutObjectUploader.java} | 45 +++++++++---------- .../singer/writer/s3/S3WriterTest.java | 3 -- 2 files changed, 22 insertions(+), 26 deletions(-) rename singer/src/main/java/com/pinterest/singer/writer/s3/{ObjectUploaderTask.java => PutObjectUploader.java} (69%) diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/ObjectUploaderTask.java b/singer/src/main/java/com/pinterest/singer/writer/s3/PutObjectUploader.java similarity index 69% rename from singer/src/main/java/com/pinterest/singer/writer/s3/ObjectUploaderTask.java rename to singer/src/main/java/com/pinterest/singer/writer/s3/PutObjectUploader.java index f980726f..fd21b818 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/s3/ObjectUploaderTask.java +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/PutObjectUploader.java @@ -1,5 +1,7 @@ package com.pinterest.singer.writer.s3; +import com.pinterest.singer.thrift.configuration.S3WriterConfig; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,76 +12,73 @@ import java.io.File; -public class ObjectUploaderTask { - private static final Logger LOG = LoggerFactory.getLogger(S3Writer.class); - private final S3Client s3Client; - private final String bucket; - private final ObjectCannedACL cannedAcl; +public class PutObjectUploader extends S3Uploader { + private static final Logger LOG = LoggerFactory.getLogger(PutObjectUploader.class); private final int maxRetries; private static final long INITIAL_BACKOFF = 1000; // Initial backoff in milliseconds private static final long MAX_BACKOFF = 32000; // Maximum backoff in milliseconds - public ObjectUploaderTask(S3Client s3Client, String bucket, ObjectCannedACL cannedAcl, - int maxRetries) { - this.s3Client = s3Client; - this.bucket = bucket; - this.cannedAcl = cannedAcl; - this.maxRetries = maxRetries; + public PutObjectUploader(S3WriterConfig s3WriterConfig, S3Client s3Client) { + super(s3WriterConfig, s3Client); + this.maxRetries = s3WriterConfig.getMaxRetries(); } /** * Uploads a file to S3 using the PutObject API. * Uses exponential backoff with a cap for retries. * - * @param file is the actual file in disk to be uploaded - * @param fileFormat is the key suffix + * @param s3ObjectUpload the object to upload * @return true if the file was successfully uploaded, false otherwise */ - public boolean upload(File file, String fileFormat) { + @Override + public boolean upload(S3ObjectUpload s3ObjectUpload) { int attempts = 0; boolean success = false; long backoff = INITIAL_BACKOFF; + String s3Key = s3ObjectUpload.getKey(); while (attempts < maxRetries && !success) { attempts++; try { PutObjectRequest putObjectRequest = PutObjectRequest.builder() .bucket(bucket) - .key(fileFormat) + .key(s3Key) .build(); if (cannedAcl != null) { putObjectRequest = putObjectRequest.toBuilder().acl(cannedAcl).build(); } - PutObjectResponse putObjectResponse = s3Client.putObject(putObjectRequest, file.toPath()); + PutObjectResponse + putObjectResponse = + s3Client.putObject(putObjectRequest, s3ObjectUpload.getFile().toPath()); if (putObjectResponse.sdkHttpResponse().isSuccessful()) { - LOG.info("Successfully uploaded file: {} on attempt {}", fileFormat, attempts); + LOG.info("Successfully uploaded file: {} on attempt {}", s3Key, attempts); success = true; } else { - LOG.error("Failed to upload file: {} on attempt {}", fileFormat, attempts); + LOG.error("Failed to upload file: {} on attempt {}", s3Key, attempts); } } catch (Exception e) { - LOG.error("Failed to upload file: {} on attempt {}. Error: {}", fileFormat, attempts, e.getMessage()); + LOG.error("Failed to upload file: {} on attempt {}. Error: {}", s3Key, attempts, e.getMessage()); } if (!success && attempts < maxRetries) { try { - LOG.info("Failed to upload file: {} on attempt {}. Retrying in {} ms...", fileFormat, attempts, backoff); + LOG.info("Failed to upload file: {} on attempt {}. Retrying in {} ms...", s3Key, attempts, backoff); Thread.sleep(backoff); backoff = Math.min(backoff * 2, MAX_BACKOFF); // Exponential backoff with a cap } catch (InterruptedException ie) { - LOG.error("Interrupted while waiting to retry file upload: {}", fileFormat, ie); + LOG.error("Interrupted while waiting to retry file upload: {}", s3Key, ie); } } } if (!success) { // TODO: this means data loss as Singer gives up uploading the file, which is not ideal. We need a fallback mechanism. - LOG.error("Exhausted all attempts ({}) to upload file: {}", maxRetries, fileFormat); + LOG.error("Exhausted all attempts ({}) to upload file: {}", maxRetries, s3Key); return false; } return true; } -} \ No newline at end of file +} diff --git a/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java b/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java index 7210e5a0..6c87ae99 100644 --- a/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java +++ b/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java @@ -18,7 +18,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import software.amazon.awssdk.services.s3.S3Client; import java.io.File; import java.io.IOException; @@ -28,7 +27,6 @@ import static org.junit.Assert.assertNotEquals; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -79,7 +77,6 @@ public void tearDown() throws IOException { SingerUtils.setHostname(SingerUtils.getHostname(), "-"); } - @Test public void testSanitizeFileName() { String fullPathPrefix = "/var/logs/app";