Skip to content

Commit

Permalink
Merge branch 'pinterest:master' into watermark_tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
jfzunigac authored Feb 2, 2024
2 parents e57b1e3 + ca0a01f commit 5a7a4c4
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.79</version>
<version>0.8.0.80</version>
<packaging>pom</packaging>
<description>Singer Logging Agent modules</description>
<inceptionYear>2013</inceptionYear>
Expand Down
2 changes: 1 addition & 1 deletion singer-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.79</version>
<version>0.8.0.80</version>
<relativePath>../pom.xml</relativePath>
</parent>
<developers>
Expand Down
5 changes: 5 additions & 0 deletions singer-commons/src/main/thrift/config.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,9 @@ struct SingerConfig {
*/
27: optional string fsEventQueueImplementation;

/**
* Hostname Prefix regex pattern
*/
28: optional string hostnamePrefixRegex = "-";

}
2 changes: 1 addition & 1 deletion singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.79</version>
<version>0.8.0.80</version>
<relativePath>../pom.xml</relativePath>
</parent>
<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class SingerMetrics {
public static final String PROCESSOR_MESSAGE_KEY_SIZE_BYTES = "processor.message.key.size.bytes";
public static final String PROCESSOR_MESSAGE_VALUE_SIZE_BYTES = "processor.message.value.size.bytes";

public static final String DISABLE_DECIDER_ACTIVE = "singer.processor.disable_decider_active";

public static final String SKIPPED_BYTES = "singer.reader.skipped_bytes";

public static final String WATERMARK_CREATION_FAILURE = "singer.watermark.creation.failure";
Expand Down
23 changes: 23 additions & 0 deletions singer/src/main/java/com/pinterest/singer/config/Decider.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package com.pinterest.singer.config;

import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.utils.HashUtils;
import com.pinterest.singer.utils.SingerUtils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
Expand All @@ -29,9 +31,14 @@

import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Basic Decider Framework.
Expand Down Expand Up @@ -103,6 +110,22 @@ public Map<String, Integer> getDeciderMap() {
return mDeciderMap;
}

/***
* Given a log name, return a list of possible decider names used to disable the log. The disable decider
* name is required to be in the format of "singer_disable_logName___HOSTNAMEPREFIX___decider".
*
* @param logName
* @return a list of disable deciders
*/
public List<String> generateDisableDeciders(String logName) {
List<String> disableDeciderList = new ArrayList<>();
for (int i = SingerUtils.HOSTNAME_PREFIXES.size() - 1; i >= 0; i--) {
String convertedHostname = SingerUtils.HOSTNAME_PREFIXES.get(i).replaceAll("[^a-zA-Z0-9]", "_");
disableDeciderList.add("singer_disable_" + logName.replaceAll("[^a-zA-Z0-9]", "_") + "___" + convertedHostname + "___decider");
}
return disableDeciderList;
}

/**
* Looks up the value of the decider variable named {@code deciderName} and flips a coin to
* determine if we should be in the experiment based on the specified ID. Useful if a stable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public class DefaultLogStreamProcessor implements LogStreamProcessor, Runnable {
// Decider for the log stream.
private final String logDecider;

// Valid deciders that can be used in conjunction with logDecider to disable the logstream at a fleet level
private final List<String> disableDeciders;

// LogStream to be processed.
protected final LogStream logStream;

Expand Down Expand Up @@ -165,6 +168,9 @@ public DefaultLogStreamProcessor(
this.exceedTimeSliceLimit = false;
this.lastModificationTimeProcessed = new AtomicLong(-1);
this.lastCompletedCycleTime = new AtomicLong(-1);
this.disableDeciders =
Decider.getInstance().generateDisableDeciders(
this.logStream.getSingerLog().getSingerLogConfig().getName());
}

@Override
Expand Down Expand Up @@ -241,7 +247,8 @@ public long processLogStream() throws LogStreamProcessorException {

/**
* If the decider is not set, this method will return true.
* If a decider is set, only return false when the decider's value is 0.
* If a decider is set, return false when the decider's value is 0
* or disable decider's (if exists) value is 100.
*
* @return true or false.
*/
Expand All @@ -252,6 +259,18 @@ boolean isLoggingAllowedByDecider() {
if (map.containsKey(logDecider)) {
result = map.get(logDecider) != 0;
}
if (result && disableDeciders != null) {
for (String disableDecider : disableDeciders) {
if (map.containsKey(disableDecider) && map.get(disableDecider) == 100) {
LOG.info("Disabling log stream {} because fleet disable decider {} is set to 100",
logStream.getLogStreamName(), disableDecider);
OpenTsdbMetricConverter.gauge(
SingerMetrics.DISABLE_DECIDER_ACTIVE, 1, "log=" + logStream.getSingerLog().getLogName());
result = false;
break;
}
}
}
}
return result;
}
Expand Down
34 changes: 33 additions & 1 deletion singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.comparator.LastModifiedFileComparator;
import org.apache.commons.io.comparator.NameFileComparator;
Expand Down Expand Up @@ -75,6 +78,7 @@ public class SingerUtils {

public static final FileSystem defaultFileSystem = FileSystems.getDefault();
public static String HOSTNAME = getHostname();
public static List<String> HOSTNAME_PREFIXES = getHostnamePrefixes("-");

public static String getHostname() {
String hostName;
Expand All @@ -91,6 +95,28 @@ public static String getHostname() {
return hostName;
}

/***
* Gradually builds substrings separated by dashes from hostname given a regex,
* will return hostname if hostname can't be split by regex
*
* @param
* @return a list of hostname prefixes
*/
public static List<String> getHostnamePrefixes(String regex) {
if (regex == null || regex.isEmpty()) {
return Arrays.asList(HOSTNAME);
}
List<String> hostPrefixes = new ArrayList<>();
String [] splitHostname = HOSTNAME.split(regex);
StringBuilder currentPrefix = new StringBuilder();
for (String prefix : splitHostname) {
currentPrefix.append(prefix);
hostPrefixes.add(currentPrefix.toString());
currentPrefix.append("-");
}
return hostPrefixes;
}

public static Path getPath(String filePathStr) {
return defaultFileSystem.getPath(filePathStr);
}
Expand Down Expand Up @@ -230,6 +256,7 @@ public static SingerConfig loadSingerConfig(String singerConfigDir,
}

LOG.info("Singer config loaded : " + singerConfig);
HOSTNAME_PREFIXES = getHostnamePrefixes(singerConfig.getHostnamePrefixRegex());
return singerConfig;
}

Expand Down Expand Up @@ -343,5 +370,10 @@ public static void deleteRecursively(File baseDir) {
}
}
}

@VisibleForTesting
public static void setHostname(String hostname, String regex) {
HOSTNAME = hostname;
HOSTNAME_PREFIXES = getHostnamePrefixes(regex);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.pinterest.singer.thrift.configuration.SingerLogConfig;
import com.pinterest.singer.thrift.configuration.ThriftReaderConfig;
import com.pinterest.singer.utils.SimpleThriftLogger;
import com.pinterest.singer.utils.SingerUtils;
import com.pinterest.singer.utils.WatermarkUtils;

import com.google.common.collect.ImmutableMap;
Expand All @@ -47,6 +48,7 @@
import java.io.IOException;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

public class DefaultLogStreamProcessorTest extends com.pinterest.singer.SingerTestBase {
Expand Down Expand Up @@ -307,6 +309,48 @@ public void testProcessLogStreamWithDecider() throws Exception {
}
}

@Test
public void testDisableDecider() throws Exception {
DefaultLogStreamProcessor processor = null;
SingerUtils.setHostname("localhost-prod.cluster-19970722", "[.-]");
try {
SingerConfig singerConfig = new SingerConfig();
singerConfig.setThreadPoolSize(1);
singerConfig.setWriterThreadPoolSize(1);
SingerSettings.initialize(singerConfig);
SingerLog singerLog = new SingerLog(
new SingerLogConfig("test", getTempPath(), "thrift.log", null, null, null));
LogStream logStream = new LogStream(singerLog, "thrift.log");
NoOpLogStreamWriter writer = new NoOpLogStreamWriter();
processor = new DefaultLogStreamProcessor(
logStream,
"singer_test_decider",
new DefaultLogStreamReader(
logStream,
new ThriftLogFileReaderFactory(new ThriftReaderConfig(16000, 16000))),
writer,
50, 1, 1, 3600, 1800);
Decider.setInstance(new HashMap<>());
Decider.getInstance().getDeciderMap().put("singer_test_decider", 100);
assertEquals(true, processor.isLoggingAllowedByDecider());

Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 100);
assertEquals(false, processor.isLoggingAllowedByDecider());

Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 50);
Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost_prod_cluster___decider", 100);
assertEquals(false, processor.isLoggingAllowedByDecider());

} catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
} finally {
if (processor != null) {
processor.close();
}
}
SingerUtils.setHostname(SingerUtils.getHostname(), "-");
}
private static List<LogMessage> getMessages(List<LogMessageAndPosition> messageAndPositions) {
List<LogMessage> messages = Lists.newArrayListWithExpectedSize(messageAndPositions.size());
for (LogMessageAndPosition messageAndPosition : messageAndPositions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.pinterest.singer.utils;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import org.junit.Test;

Expand All @@ -25,6 +26,8 @@
import com.pinterest.singer.thrift.configuration.SingerConfig;
import com.pinterest.singer.thrift.configuration.SingerLogConfig;

import java.util.List;

public class TestSingerUtils {

@Test
Expand Down Expand Up @@ -53,4 +56,45 @@ public void testGetHostNameBasedOnConfig() {
assertEquals(SingerUtils.getHostname(), hostNameBasedOnConfig);
}

@Test
public void testGetHostnamePrefixes() {
// Check simple dashes only
String regex = "-";
SingerUtils.setHostname("localhost-prod-cluster-19970722", regex);
String [] prefixes = {"localhost", "localhost-prod", "localhost-prod-cluster", "localhost-prod-cluster-19970722"};
List<String> finalPrefixes = SingerUtils.getHostnamePrefixes(regex);
assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes)));

// Check dots and dashes
regex = "[.-]";
SingerUtils.setHostname("localhost-prod.cluster-19970722", regex);
prefixes = new String[]{"localhost", "localhost-prod", "localhost-prod-cluster", "localhost-prod-cluster-19970722"};
finalPrefixes = SingerUtils.getHostnamePrefixes(regex);
assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes)));

// Check regex is empty
regex = "";
SingerUtils.setHostname("localhost-dev.19970722", regex);
prefixes = new String []{"localhost-dev.19970722"};
finalPrefixes = SingerUtils.getHostnamePrefixes(regex);
assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes)));

// Check regex is null
regex = null;
SingerUtils.setHostname("localhost-dev.19970722", regex);
prefixes = new String []{"localhost-dev.19970722"};
finalPrefixes = SingerUtils.getHostnamePrefixes(regex);
assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes)));

// Check regex is not matched
regex = "abc";
SingerUtils.setHostname("localhost-dev.19970722", regex);
prefixes = new String []{"localhost-dev.19970722"};
finalPrefixes = SingerUtils.getHostnamePrefixes(regex);
assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes)));

// reset hostname
SingerUtils.setHostname(SingerUtils.getHostname(), "-");
}

}
2 changes: 1 addition & 1 deletion thrift-logger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.79</version>
<version>0.8.0.80</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>thrift-logger</artifactId>
Expand Down

0 comments on commit 5a7a4c4

Please sign in to comment.