Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change buffer file naming #459

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 20 additions & 30 deletions singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.regex.Pattern;

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;

/**
* A LogStreamWriter for Singer that writes to S3 (writer.type=s3).
Expand Down Expand Up @@ -68,7 +67,6 @@ public class S3Writer implements LogStreamWriter {
// Custom Thresholds
private int maxFileSizeMB;
private int minUploadTime;
private int maxRetries;
private Pattern filenamePattern;
private List<String> fileNameTokens = new ArrayList<>();
private boolean filenameParsingEnabled = false;
Expand Down Expand Up @@ -129,8 +127,6 @@ public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig, S3Uploader s
private void initialize() {
this.maxFileSizeMB = s3WriterConfig.getMaxFileSizeMB();
this.minUploadTime = s3WriterConfig.getMinUploadTimeInSeconds();
this.maxRetries = s3WriterConfig.getMaxRetries();

if (s3WriterConfig.isSetFilenamePattern() && s3WriterConfig.isSetFilenameTokens()) {
this.filenameParsingEnabled = true;
this.filenamePattern = Pattern.compile(s3WriterConfig.getFilenamePattern());
Expand Down Expand Up @@ -182,13 +178,16 @@ public boolean isCommittableWriter() {
}

/**
* Takes the fullPathPrefix and removes all slashes and replaces them with underscores.
* Get or construct buffer file name based on the log stream name.
* The buffer file naming convention is "log_name.dir_name.file_name.buffer".
*
* @return the buffer file name
*/
public static String sanitizeFileName(String fullPathPrefix) {
if (fullPathPrefix.startsWith("/")) {
fullPathPrefix = fullPathPrefix.substring(1);
}
return fullPathPrefix.replace("/", "_");
public String getBufferFileName() {
return (bufferFile != null) ? bufferFile.getName()
: logName + "." + logStream.getLogDir().substring(1)
.replace("/", "_") + "."
+ logStream.getLogStreamName() + ".buffer";
}

/**
Expand All @@ -203,13 +202,13 @@ public static String sanitizeFileName(String fullPathPrefix) {
public synchronized void startCommit(boolean isDraining) throws LogStreamWriterException {
try {
if (!bufferFile.exists()) {
bufferFile.createNewFile();
resetBufferFile();
}

bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true));

} catch (IOException e) {
throw new RuntimeException("Failed to create buffer file: " + bufferFile.getName(), e);
throw new RuntimeException("Failed to create buffer file: " + getBufferFileName(), e);
}
if (uploadFuture == null) {
scheduleUploadTask();
Expand Down Expand Up @@ -250,7 +249,7 @@ private void scheduleUploadTask() {
private void uploadDiskBufferedFileToS3() throws IOException {
File
fileToUpload =
new File(BUFFER_DIR, bufferFile.getName() + "." + FORMATTER.format(new Date()));
new File(BUFFER_DIR, getBufferFileName() + "." + FORMATTER.format(new Date()));
String fileFormat = generateS3ObjectKey();
try {
Files.move(bufferFile.toPath(), fileToUpload.toPath());
Expand All @@ -266,7 +265,7 @@ private void uploadDiskBufferedFileToS3() throws IOException {
}
fileToUpload.delete();
} catch (IOException e) {
LOG.error("Failed to rename buffer file " + bufferFile.getName(), e);
LOG.error("Failed to rename buffer file " + getBufferFileName(), e);
}
}

Expand Down Expand Up @@ -318,24 +317,15 @@ private void writeMessageToBuffer(LogMessageAndPosition logMessageAndPosition)
* @throws IOException
*/
private void resetBufferFile() throws IOException {
String
bufferFileName =
sanitizeFileName(logStream.getFullPathPrefix()) + ".buffer." + UUID.randomUUID()
.toString().substring(0, 8);
bufferFile = new File(BUFFER_DIR, bufferFileName);
bufferFile.createNewFile();
bufferFile = new File(BUFFER_DIR, getBufferFileName());
if (!bufferFile.createNewFile()) {
LOG.info(
"Buffer file for log stream {} already exists, continue with existing buffer file: {}",
logName, getBufferFileName());
}
bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true));
}

/**
* Helper function to get the remaining part of the host name after the cluster prefix,
* typically a UUID.
*/
public static String extractHostSuffix(String inputStr) {
String[] parts = inputStr.split("-");
return parts[parts.length - 1];
}

private Matcher extractTokensFromFilename(String logFileName) {
Matcher matcher = filenamePattern.matcher(logFileName);
if (!matcher.matches()) {
Expand Down Expand Up @@ -507,7 +497,7 @@ public void close() throws IOException {
uploadDiskBufferedFileToS3();
bufferFile.delete();
} catch (IOException e) {
LOG.error("Failed to close bufferedWriter or upload buffer file: " + bufferFile.getName(),
LOG.error("Failed to close bufferedWriter or upload buffer file: " + getBufferFileName(),
e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public void setUp() {
s3WriterConfig.setMaxRetries(3);

// Initialize the S3Writer with mock dependencies
s3Writer =
new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
}

@After
Expand All @@ -75,62 +74,7 @@ public void tearDown() throws IOException {
}
// reset hostname
SingerUtils.setHostname(SingerUtils.getHostname(), "-");
}

@Test
public void testSanitizeFileName() {
String fullPathPrefix = "/var/logs/app";
String expected = "var_logs_app";
String result = s3Writer.sanitizeFileName(fullPathPrefix);
assertEquals(expected, result);

fullPathPrefix = "var/logs/app";
expected = "var_logs_app";
result = s3Writer.sanitizeFileName(fullPathPrefix);
assertEquals(expected, result);

fullPathPrefix = "/var/logs/app/";
expected = "var_logs_app_";
result = s3Writer.sanitizeFileName(fullPathPrefix);
assertEquals(expected, result);

fullPathPrefix = "/";
expected = "";
result = s3Writer.sanitizeFileName(fullPathPrefix);
assertEquals(expected, result);

fullPathPrefix = "";
expected = "";
result = s3Writer.sanitizeFileName(fullPathPrefix);
assertEquals(expected, result);
}

@Test
public void testExtractHostSuffix() {
String hostname = "app-server-12345";
String expected = "12345";
String result = S3Writer.extractHostSuffix(hostname);
assertEquals(expected, result);

hostname = "app-12345";
expected = "12345";
result = S3Writer.extractHostSuffix(hostname);
assertEquals(expected, result);

hostname = "12345";
expected = "12345";
result = S3Writer.extractHostSuffix(hostname);
assertEquals(expected, result);

hostname = "app-server";
expected = "server";
result = S3Writer.extractHostSuffix(hostname);
assertEquals(expected, result);

hostname = "";
expected = "";
result = S3Writer.extractHostSuffix(hostname);
assertEquals(expected, result);
s3Writer.close();
}

@Test
Expand All @@ -146,21 +90,8 @@ public void testWriteLogMessageToCommit() throws Exception {
s3Writer.endCommit(1, false);

// Verify that the messages are written to the buffer file
String
bufferFileNamePrefix =
s3Writer.sanitizeFileName(logStream.getFullPathPrefix()) + ".buffer.";
File tmpDir = new File(tempPath);
File bufferFile = null;
File [] tmpFiles = tmpDir.listFiles();
boolean bufferFileExists = false;
for (File file : tmpFiles) {
if (file.getName().startsWith(bufferFileNamePrefix)) {
bufferFileExists = true;
bufferFile = file;
break;
}
}
assertTrue(bufferFileExists);
File bufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName());
assertTrue(bufferFile.exists());
String content = new String(Files.readAllBytes(bufferFile.toPath()));
assertTrue(content.contains("test message"));
}
Expand Down Expand Up @@ -210,6 +141,37 @@ public void testUploadIsScheduled() throws Exception {
verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class));
}

@Test
public void testResumeFromExistingBufferFile() throws Exception {
// Prepare log message
ByteBuffer messageBuffer = ByteBuffer.wrap("This is message 1 :".getBytes());
LogMessage logMessage = new LogMessage(messageBuffer);
LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null);

// Write log message to commit
s3Writer.startCommit(false);
s3Writer.writeLogMessageToCommit(logMessageAndPosition, false);

// Create a new S3Writer with the same buffer file and write another message to simulate resuming
S3Writer
newS3Writer =
new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
messageBuffer = ByteBuffer.wrap(" This is message 2".getBytes());
logMessage = new LogMessage(messageBuffer);
logMessageAndPosition = new LogMessageAndPosition(logMessage, null);

// Write log message to commit
newS3Writer.startCommit(false);
newS3Writer.writeLogMessageToCommit(logMessageAndPosition, false);

// Verify that the messages are written to the buffer file
File bufferFile = new File(tempPath + "/" + newS3Writer.getBufferFileName());
assertTrue(bufferFile.exists());
String content = new String(Files.readAllBytes(bufferFile.toPath()));
assertTrue(content.contains("This is message 1 : This is message 2"));
newS3Writer.close();
}

@Test
public void testObjectKeyGeneration() {
// Custom and default tokens used
Expand All @@ -223,24 +185,25 @@ public void testObjectKeyGeneration() {
s3WriterConfig.setBucket("bucket-name");
s3WriterConfig.setFilenamePattern("(?<namespace>[^-]+)-(?<filename>[^.]+)\\.(?<index>\\d+)");
s3WriterConfig.setFilenameTokens(Arrays.asList("namespace", "filename", "index"));
s3Writer =
new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);

// Check key prefix
String[] objectKeyParts = s3Writer.generateS3ObjectKey().split("/");
assertEquals(4, objectKeyParts.length);
assertEquals("my-path", objectKeyParts[0]);
assertEquals("my_namespace", objectKeyParts[1]);
assertEquals(logStream.getSingerLog().getSingerLogConfig().getName(), objectKeyParts[2]);

// Check last part of object key
String[] keySuffixParts = objectKeyParts[3].split("\\.");
assertEquals(3, keySuffixParts.length);
assertEquals("test_log-0", keySuffixParts[0]);
assertNotEquals("{{S}}", keySuffixParts[1]);
assertEquals(2, keySuffixParts[1].length());

// Custom tokens provided but filename pattern does not match
s3WriterConfig.setFilenamePattern("(?<filename>[^.]+)\\.(?<index>\\d+).0");
s3Writer =
new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
objectKeyParts = s3Writer.generateS3ObjectKey().split("/");
assertEquals("%{namespace}", objectKeyParts[1]);
keySuffixParts = objectKeyParts[3].split("\\.");
Expand Down Expand Up @@ -280,10 +243,9 @@ public void testClose() throws Exception {

// Verify that the buffer file was correctly handled
String
bufferFileName =
s3Writer.sanitizeFileName(logStream.getFullPathPrefix()) + ".buffer.log";
bufferFileName = s3Writer.getBufferFileName();
File bufferFile = new File(FilenameUtils.concat(tempPath, bufferFileName));
assertTrue(!bufferFile.exists());
assertFalse(bufferFile.exists());
assertEquals(0, bufferFile.length());
verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class));
}
Expand Down
Loading