Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce s3 client manager and s3 uploaders #458

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions singer-commons/src/main/thrift/config.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ struct S3WriterConfig {
// The S3 canned access control lists (ACLs) that can be applied to uploaded objects to determine their access permissions.
// We don't set a default since some buckets don't allow setting canned ACLs.
9: optional string cannedAcl;
// The uploader class to use for uploading objects to S3.
10: optional string uploaderClass = "com.pinterest.singer.writer.s3.PutObjectUploader";
// Region of the S3 bucket.
11: optional string region = "us-east-1";
}

enum RealpinObjectType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class SingerConfigDef {
public static final String RBM_ENCODING = "encoding";
public static final String RBM_APPEND_NEW_LINE = "appendNewLine";

// S3 writer config
public static final String BUCKET = "bucket";
public static final String KEY_FORMAT = "keyFormat";
public static final String MAX_FILE_SIZE_MB = "maxFileSizeMB";
Expand All @@ -117,5 +118,7 @@ public class SingerConfigDef {
public static final String MAX_RETRIES = "maxRetries";
public static final String BUFFER_DIR = "bufferDir";
public static final String CANNED_ACL = "cannedAcl";
public static final String UPLOADER_CLASS = "uploaderClass";
public static final String REGION = "region";
public static final String NAMED_GROUP_PATTERN = "\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>";
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.pinterest.singer.thrift.configuration.TransformType;
import com.pinterest.singer.thrift.configuration.WriterType;

import com.amazonaws.regions.Regions;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -849,6 +850,25 @@ private static S3WriterConfig parseS3WriterConfig(AbstractConfiguration writerCo
if (writerConfiguration.containsKey(SingerConfigDef.BUFFER_DIR)) {
config.setBufferDir(writerConfiguration.getString(SingerConfigDef.BUFFER_DIR));
}

if (writerConfiguration.containsKey(SingerConfigDef.UPLOADER_CLASS)) {
// check if class is valid
try {
Class.forName(writerConfiguration.getString(SingerConfigDef.UPLOADER_CLASS));
} catch (ClassNotFoundException e) {
throw new ConfigurationException("Invalid uploader class: " + writerConfiguration.getString(SingerConfigDef.UPLOADER_CLASS), e);
}
config.setUploaderClass(writerConfiguration.getString(SingerConfigDef.UPLOADER_CLASS));
}

if (writerConfiguration.containsKey(SingerConfigDef.REGION)) {
try {
Regions.fromName(writerConfiguration.getString(SingerConfigDef.REGION));
} catch (IllegalArgumentException e) {
throw new ConfigurationException("Invalid region provided: " + writerConfiguration.getString(SingerConfigDef.REGION), e);
}
config.setRegion(writerConfiguration.getString(SingerConfigDef.REGION));
}
return config;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.pinterest.singer.writer.s3;

import com.pinterest.singer.thrift.configuration.S3WriterConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -10,76 +12,73 @@

import java.io.File;

public class ObjectUploaderTask {
private static final Logger LOG = LoggerFactory.getLogger(S3Writer.class);
private final S3Client s3Client;
private final String bucket;
private final ObjectCannedACL cannedAcl;
public class PutObjectUploader extends S3Uploader {
private static final Logger LOG = LoggerFactory.getLogger(PutObjectUploader.class);
private final int maxRetries;
private static final long INITIAL_BACKOFF = 1000; // Initial backoff in milliseconds
private static final long MAX_BACKOFF = 32000; // Maximum backoff in milliseconds

public ObjectUploaderTask(S3Client s3Client, String bucket, ObjectCannedACL cannedAcl,
int maxRetries) {
this.s3Client = s3Client;
this.bucket = bucket;
this.cannedAcl = cannedAcl;
this.maxRetries = maxRetries;
public PutObjectUploader(S3WriterConfig s3WriterConfig, S3Client s3Client) {
super(s3WriterConfig, s3Client);
this.maxRetries = s3WriterConfig.getMaxRetries();
}

/**
* Uploads a file to S3 using the PutObject API.
* Uses exponential backoff with a cap for retries.
*
* @param file is the actual file in disk to be uploaded
* @param fileFormat is the key suffix
* @param s3ObjectUpload the object to upload
* @return true if the file was successfully uploaded, false otherwise
*/
public boolean upload(File file, String fileFormat) {
@Override
public boolean upload(S3ObjectUpload s3ObjectUpload) {
int attempts = 0;
boolean success = false;
long backoff = INITIAL_BACKOFF;
String s3Key = s3ObjectUpload.getKey();

while (attempts < maxRetries && !success) {
attempts++;
try {
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(bucket)
.key(fileFormat)
.key(s3Key)
.build();

if (cannedAcl != null) {
putObjectRequest = putObjectRequest.toBuilder().acl(cannedAcl).build();
}

PutObjectResponse putObjectResponse = s3Client.putObject(putObjectRequest, file.toPath());
PutObjectResponse
putObjectResponse =
s3Client.putObject(putObjectRequest, s3ObjectUpload.getFile().toPath());

if (putObjectResponse.sdkHttpResponse().isSuccessful()) {
LOG.info("Successfully uploaded file: {} on attempt {}", fileFormat, attempts);
LOG.info("Successfully uploaded file: {} on attempt {}", s3Key, attempts);
success = true;
} else {
LOG.error("Failed to upload file: {} on attempt {}", fileFormat, attempts);
LOG.error("Failed to upload file: {} on attempt {}", s3Key, attempts);
}
} catch (Exception e) {
LOG.error("Failed to upload file: {} on attempt {}. Error: {}", fileFormat, attempts, e.getMessage());
LOG.error("Failed to upload file: {} on attempt {}. Error: {}", s3Key, attempts, e.getMessage());
}

if (!success && attempts < maxRetries) {
try {
LOG.info("Failed to upload file: {} on attempt {}. Retrying in {} ms...", fileFormat, attempts, backoff);
LOG.info("Failed to upload file: {} on attempt {}. Retrying in {} ms...", s3Key, attempts, backoff);
Thread.sleep(backoff);
backoff = Math.min(backoff * 2, MAX_BACKOFF); // Exponential backoff with a cap
} catch (InterruptedException ie) {
LOG.error("Interrupted while waiting to retry file upload: {}", fileFormat, ie);
LOG.error("Interrupted while waiting to retry file upload: {}", s3Key, ie);
}
}
}

if (!success) {
// TODO: this means data loss as Singer gives up uploading the file, which is not ideal. We need a fallback mechanism.
LOG.error("Exhausted all attempts ({}) to upload file: {}", maxRetries, fileFormat);
LOG.error("Exhausted all attempts ({}) to upload file: {}", maxRetries, s3Key);
return false;
}
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.pinterest.singer.writer.s3;

import com.pinterest.singer.common.SingerMetrics;
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Singleton class that holds a region -> S3Client mapping for S3 client
* reuse and multi-region support.
*/
public class S3ClientManager {

private static final Logger LOG = LoggerFactory.getLogger(S3ClientManager.class);
private static S3ClientManager s3UploaderManagerInstance = null;

static {
S3ClientManager.getInstance();
}

public static S3ClientManager getInstance() {
if (s3UploaderManagerInstance == null) {
synchronized (S3ClientManager.class) {
if (s3UploaderManagerInstance == null) {
s3UploaderManagerInstance = new S3ClientManager();
}
}
}
return s3UploaderManagerInstance;
}

private ConcurrentHashMap<String, S3Client> s3ClientMap;

private S3ClientManager() {
s3ClientMap = new ConcurrentHashMap<>();
}

public static void shutdown() {
S3ClientManager.getInstance().closeS3Clients();
}

public S3Client get(String region) {
// For we only use the region as the key, in the future we can construct the key with more
// fields (e.g endpoint + region).
String key = region;
// We don't check if the client is closed here because S3 clients are meant to be
// long-lived objects, and they are not closed by the writers.
if (s3ClientMap.containsKey(key)) {
return s3ClientMap.get(key);
}
S3Client s3Client = S3Client.builder()
.region(Region.of(region))
.build();
s3ClientMap.put(key, s3Client);
OpenTsdbMetricConverter.addMetric(SingerMetrics.S3_WRITER + "num_clients", s3ClientMap.size());
return s3Client;
}

private void closeS3Clients() {
s3ClientMap.forEach((key, client) -> {
try {
client.close();
} catch (Exception e) {
LOG.error("Failed to close S3Client: {} ", key, e);
}
});
}

@VisibleForTesting
public Map<String, S3Client> getS3ClientMap() {
return s3ClientMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.pinterest.singer.writer.s3;

import java.io.File;

/**
* Represents an S3 object upload.
*/
public class S3ObjectUpload {

private final String key;
private final File file;

public S3ObjectUpload(String key, File file) {
this.key = key;
this.file = file;
}

public String getKey() {
return key;
}

public File getFile() {
return file;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.pinterest.singer.writer.s3;

import com.pinterest.singer.thrift.configuration.S3WriterConfig;

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;

/**
* Abstract class for uploading S3 objects.
*/
public abstract class S3Uploader {
protected S3WriterConfig s3WriterConfig;
protected final ObjectCannedACL cannedAcl;
protected final String bucket;
protected S3Client s3Client;

public S3Uploader(S3WriterConfig s3WriterConfig, S3Client s3Client) {
this.s3WriterConfig = s3WriterConfig;
this.bucket = s3WriterConfig.getBucket();
this.cannedAcl = ObjectCannedACL.fromValue(s3WriterConfig.getCannedAcl());
this.s3Client = s3Client;
}

/**
* Uploads the given S3ObjectUpload to S3.
*
* @param s3ObjectUpload The S3ObjectUpload to upload.
* @return true if the upload was successful, false otherwise.
*/
public abstract boolean upload(S3ObjectUpload s3ObjectUpload);
}
33 changes: 13 additions & 20 deletions singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public class S3Writer implements LogStreamWriter {
private final String logName;
private final String BUFFER_DIR;
private static final int BYTES_IN_MB = 1024 * 1024;
private ObjectUploaderTask putObjectUploader;
private S3Client s3Client;
private S3Uploader s3Uploader;
private final S3WriterConfig s3WriterConfig;

// S3 information
Expand Down Expand Up @@ -111,20 +110,18 @@ public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig) {

// Static factory method for testing
@VisibleForTesting
public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig, S3Client s3Client,
ObjectUploaderTask putObjectUploader, String path) {
public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig, S3Uploader s3Uploader,
String path) {
Preconditions.checkNotNull(logStream);
Preconditions.checkNotNull(s3WriterConfig);
Preconditions.checkNotNull(s3Client);
Preconditions.checkNotNull(putObjectUploader);
Preconditions.checkNotNull(s3Uploader);
Preconditions.checkNotNull(!Strings.isNullOrEmpty(path));

this.BUFFER_DIR = path;
this.logStream = logStream;
this.logName = logStream.getSingerLog().getSingerLogConfig().getName();
this.s3WriterConfig = s3WriterConfig;
this.putObjectUploader = putObjectUploader;
this.s3Client = s3Client;
this.s3Uploader = s3Uploader;
initialize();

}
Expand Down Expand Up @@ -157,13 +154,12 @@ private void initialize() {
}

try {
if (s3Client == null) {
s3Client = S3Client.builder().build();
}
if (putObjectUploader == null) {
putObjectUploader =
new ObjectUploaderTask(s3Client, bucketName,
ObjectCannedACL.fromValue(s3WriterConfig.getCannedAcl()), maxRetries);
if (s3Uploader == null) {
Class<?> clazz = Class.forName(s3WriterConfig.getUploaderClass());
s3Uploader =
(S3Uploader) clazz.getConstructor(S3WriterConfig.class, S3Client.class)
.newInstance(s3WriterConfig, S3ClientManager.getInstance()
.get(s3WriterConfig.getRegion()));
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -259,7 +255,7 @@ private void uploadDiskBufferedFileToS3() throws IOException {
try {
Files.move(bufferFile.toPath(), fileToUpload.toPath());
resetBufferFile();
if (this.putObjectUploader.upload(fileToUpload, fileFormat)) {
if (this.s3Uploader.upload(new S3ObjectUpload(fileFormat, fileToUpload))) {
OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "num_uploads", 1,
"bucket=" + bucketName, "host=" + HOSTNAME,
"logName=" + logName);
Expand Down Expand Up @@ -527,10 +523,7 @@ public void close() throws IOException {
} catch (IOException e) {
LOG.error("Failed to close buffer writers", e);
}

if (s3Client != null) {
s3Client.close();
}
// Refrain from closing the S3 client because it can be shared by other writers
}
}
}
Loading
Loading