diff --git a/pom.xml b/pom.xml
index 655ab140..2d0876a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
4.0.0
com.pinterest.singer
singer-package
- 0.8.0.79
+ 0.8.0.80
pom
Singer Logging Agent modules
2013
diff --git a/singer-commons/pom.xml b/singer-commons/pom.xml
index 65c3753d..24c3ec7d 100644
--- a/singer-commons/pom.xml
+++ b/singer-commons/pom.xml
@@ -19,7 +19,7 @@
com.pinterest.singer
singer-package
- 0.8.0.79
+ 0.8.0.80
../pom.xml
diff --git a/singer-commons/src/main/thrift/config.thrift b/singer-commons/src/main/thrift/config.thrift
index 42d0bfb1..6dc5f7e0 100644
--- a/singer-commons/src/main/thrift/config.thrift
+++ b/singer-commons/src/main/thrift/config.thrift
@@ -422,4 +422,9 @@ struct SingerConfig {
*/
27: optional string fsEventQueueImplementation;
+ /**
+ * Hostname Prefix regex pattern
+ */
+ 28: optional string hostnamePrefixRegex = "-";
+
}
diff --git a/singer/pom.xml b/singer/pom.xml
index 3a5512c2..d6da3919 100644
--- a/singer/pom.xml
+++ b/singer/pom.xml
@@ -7,7 +7,7 @@
com.pinterest.singer
singer-package
- 0.8.0.79
+ 0.8.0.80
../pom.xml
diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
index f674e1ac..f9698678 100644
--- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
+++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
@@ -61,6 +61,8 @@ public class SingerMetrics {
public static final String PROCESSOR_MESSAGE_KEY_SIZE_BYTES = "processor.message.key.size.bytes";
public static final String PROCESSOR_MESSAGE_VALUE_SIZE_BYTES = "processor.message.value.size.bytes";
+ public static final String DISABLE_DECIDER_ACTIVE = "singer.processor.disable_decider_active";
+
public static final String SKIPPED_BYTES = "singer.reader.skipped_bytes";
public static final String WATERMARK_CREATION_FAILURE = "singer.watermark.creation.failure";
diff --git a/singer/src/main/java/com/pinterest/singer/config/Decider.java b/singer/src/main/java/com/pinterest/singer/config/Decider.java
index 5a07e445..29ae3c30 100644
--- a/singer/src/main/java/com/pinterest/singer/config/Decider.java
+++ b/singer/src/main/java/com/pinterest/singer/config/Decider.java
@@ -15,7 +15,9 @@
*/
package com.pinterest.singer.config;
+import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.utils.HashUtils;
+import com.pinterest.singer.utils.SingerUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
@@ -29,9 +31,14 @@
import java.io.IOException;
import java.math.BigInteger;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* Basic Decider Framework.
@@ -103,6 +110,22 @@ public Map getDeciderMap() {
return mDeciderMap;
}
+ /***
+ * Given a log name, return a list of possible decider names used to disable the log. The disable decider
+ * name is required to be in the format of "singer_disable_logName___HOSTNAMEPREFIX___decider".
+ *
+ * @param logName
+ * @return a list of disable deciders
+ */
+ public List generateDisableDeciders(String logName) {
+ List disableDeciderList = new ArrayList<>();
+ for (int i = SingerUtils.HOSTNAME_PREFIXES.size() - 1; i >= 0; i--) {
+ String convertedHostname = SingerUtils.HOSTNAME_PREFIXES.get(i).replaceAll("[^a-zA-Z0-9]", "_");
+ disableDeciderList.add("singer_disable_" + logName.replaceAll("[^a-zA-Z0-9]", "_") + "___" + convertedHostname + "___decider");
+ }
+ return disableDeciderList;
+ }
+
/**
* Looks up the value of the decider variable named {@code deciderName} and flips a coin to
* determine if we should be in the experiment based on the specified ID. Useful if a stable
diff --git a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java
index c2aada34..be706265 100644
--- a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java
+++ b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java
@@ -68,6 +68,9 @@ public class DefaultLogStreamProcessor implements LogStreamProcessor, Runnable {
// Decider for the log stream.
private final String logDecider;
+ // Valid deciders that can be used in conjunction with logDecider to disable the logstream at a fleet level
+ private final List disableDeciders;
+
// LogStream to be processed.
protected final LogStream logStream;
@@ -165,6 +168,9 @@ public DefaultLogStreamProcessor(
this.exceedTimeSliceLimit = false;
this.lastModificationTimeProcessed = new AtomicLong(-1);
this.lastCompletedCycleTime = new AtomicLong(-1);
+ this.disableDeciders =
+ Decider.getInstance().generateDisableDeciders(
+ this.logStream.getSingerLog().getSingerLogConfig().getName());
}
@Override
@@ -241,7 +247,8 @@ public long processLogStream() throws LogStreamProcessorException {
/**
* If the decider is not set, this method will return true.
- * If a decider is set, only return false when the decider's value is 0.
+ * If a decider is set, return false when the decider's value is 0
+ * or disable decider's (if exists) value is 100.
*
* @return true or false.
*/
@@ -252,6 +259,18 @@ boolean isLoggingAllowedByDecider() {
if (map.containsKey(logDecider)) {
result = map.get(logDecider) != 0;
}
+ if (result && disableDeciders != null) {
+ for (String disableDecider : disableDeciders) {
+ if (map.containsKey(disableDecider) && map.get(disableDecider) == 100) {
+ LOG.info("Disabling log stream {} because fleet disable decider {} is set to 100",
+ logStream.getLogStreamName(), disableDecider);
+ OpenTsdbMetricConverter.gauge(
+ SingerMetrics.DISABLE_DECIDER_ACTIVE, 1, "log=" + logStream.getSingerLog().getLogName());
+ result = false;
+ break;
+ }
+ }
+ }
}
return result;
}
diff --git a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
index 9a481a2f..4756c1c1 100644
--- a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
+++ b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
@@ -34,10 +34,13 @@
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
+import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.comparator.LastModifiedFileComparator;
import org.apache.commons.io.comparator.NameFileComparator;
@@ -75,6 +78,7 @@ public class SingerUtils {
public static final FileSystem defaultFileSystem = FileSystems.getDefault();
public static String HOSTNAME = getHostname();
+ public static List HOSTNAME_PREFIXES = getHostnamePrefixes("-");
public static String getHostname() {
String hostName;
@@ -91,6 +95,28 @@ public static String getHostname() {
return hostName;
}
+ /***
+ * Gradually builds substrings separated by dashes from hostname given a regex,
+ * will return hostname if hostname can't be split by regex
+ *
+ * @param
+ * @return a list of hostname prefixes
+ */
+ public static List getHostnamePrefixes(String regex) {
+ if (regex == null || regex.isEmpty()) {
+ return Arrays.asList(HOSTNAME);
+ }
+ List hostPrefixes = new ArrayList<>();
+ String [] splitHostname = HOSTNAME.split(regex);
+ StringBuilder currentPrefix = new StringBuilder();
+ for (String prefix : splitHostname) {
+ currentPrefix.append(prefix);
+ hostPrefixes.add(currentPrefix.toString());
+ currentPrefix.append("-");
+ }
+ return hostPrefixes;
+ }
+
public static Path getPath(String filePathStr) {
return defaultFileSystem.getPath(filePathStr);
}
@@ -230,6 +256,7 @@ public static SingerConfig loadSingerConfig(String singerConfigDir,
}
LOG.info("Singer config loaded : " + singerConfig);
+ HOSTNAME_PREFIXES = getHostnamePrefixes(singerConfig.getHostnamePrefixRegex());
return singerConfig;
}
@@ -343,5 +370,10 @@ public static void deleteRecursively(File baseDir) {
}
}
}
-
+ @VisibleForTesting
+ public static void setHostname(String hostname, String regex) {
+ HOSTNAME = hostname;
+ HOSTNAME_PREFIXES = getHostnamePrefixes(regex);
+ }
+
}
diff --git a/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java b/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java
index a2aa2719..3bbd5c46 100644
--- a/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java
+++ b/singer/src/test/java/com/pinterest/singer/processor/DefaultLogStreamProcessorTest.java
@@ -37,6 +37,7 @@
import com.pinterest.singer.thrift.configuration.SingerLogConfig;
import com.pinterest.singer.thrift.configuration.ThriftReaderConfig;
import com.pinterest.singer.utils.SimpleThriftLogger;
+import com.pinterest.singer.utils.SingerUtils;
import com.pinterest.singer.utils.WatermarkUtils;
import com.google.common.collect.ImmutableMap;
@@ -47,6 +48,7 @@
import java.io.IOException;
import java.io.File;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
public class DefaultLogStreamProcessorTest extends com.pinterest.singer.SingerTestBase {
@@ -307,6 +309,48 @@ public void testProcessLogStreamWithDecider() throws Exception {
}
}
+ @Test
+ public void testDisableDecider() throws Exception {
+ DefaultLogStreamProcessor processor = null;
+ SingerUtils.setHostname("localhost-prod.cluster-19970722", "[.-]");
+ try {
+ SingerConfig singerConfig = new SingerConfig();
+ singerConfig.setThreadPoolSize(1);
+ singerConfig.setWriterThreadPoolSize(1);
+ SingerSettings.initialize(singerConfig);
+ SingerLog singerLog = new SingerLog(
+ new SingerLogConfig("test", getTempPath(), "thrift.log", null, null, null));
+ LogStream logStream = new LogStream(singerLog, "thrift.log");
+ NoOpLogStreamWriter writer = new NoOpLogStreamWriter();
+ processor = new DefaultLogStreamProcessor(
+ logStream,
+ "singer_test_decider",
+ new DefaultLogStreamReader(
+ logStream,
+ new ThriftLogFileReaderFactory(new ThriftReaderConfig(16000, 16000))),
+ writer,
+ 50, 1, 1, 3600, 1800);
+ Decider.setInstance(new HashMap<>());
+ Decider.getInstance().getDeciderMap().put("singer_test_decider", 100);
+ assertEquals(true, processor.isLoggingAllowedByDecider());
+
+ Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 100);
+ assertEquals(false, processor.isLoggingAllowedByDecider());
+
+ Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost___decider", 50);
+ Decider.getInstance().getDeciderMap().put("singer_disable_test___localhost_prod_cluster___decider", 100);
+ assertEquals(false, processor.isLoggingAllowedByDecider());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ } finally {
+ if (processor != null) {
+ processor.close();
+ }
+ }
+ SingerUtils.setHostname(SingerUtils.getHostname(), "-");
+ }
private static List getMessages(List messageAndPositions) {
List messages = Lists.newArrayListWithExpectedSize(messageAndPositions.size());
for (LogMessageAndPosition messageAndPosition : messageAndPositions) {
diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java b/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java
index d4bc7464..6f89ca20 100644
--- a/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java
+++ b/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java
@@ -16,6 +16,7 @@
package com.pinterest.singer.utils;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import org.junit.Test;
@@ -25,6 +26,8 @@
import com.pinterest.singer.thrift.configuration.SingerConfig;
import com.pinterest.singer.thrift.configuration.SingerLogConfig;
+import java.util.List;
+
public class TestSingerUtils {
@Test
@@ -53,4 +56,45 @@ public void testGetHostNameBasedOnConfig() {
assertEquals(SingerUtils.getHostname(), hostNameBasedOnConfig);
}
+ @Test
+ public void testGetHostnamePrefixes() {
+ // Check simple dashes only
+ String regex = "-";
+ SingerUtils.setHostname("localhost-prod-cluster-19970722", regex);
+ String [] prefixes = {"localhost", "localhost-prod", "localhost-prod-cluster", "localhost-prod-cluster-19970722"};
+ List finalPrefixes = SingerUtils.getHostnamePrefixes(regex);
+ assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes)));
+
+ // Check dots and dashes
+ regex = "[.-]";
+ SingerUtils.setHostname("localhost-prod.cluster-19970722", regex);
+ prefixes = new String[]{"localhost", "localhost-prod", "localhost-prod-cluster", "localhost-prod-cluster-19970722"};
+ finalPrefixes = SingerUtils.getHostnamePrefixes(regex);
+ assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes)));
+
+ // Check regex is empty
+ regex = "";
+ SingerUtils.setHostname("localhost-dev.19970722", regex);
+ prefixes = new String []{"localhost-dev.19970722"};
+ finalPrefixes = SingerUtils.getHostnamePrefixes(regex);
+ assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes)));
+
+ // Check regex is null
+ regex = null;
+ SingerUtils.setHostname("localhost-dev.19970722", regex);
+ prefixes = new String []{"localhost-dev.19970722"};
+ finalPrefixes = SingerUtils.getHostnamePrefixes(regex);
+ assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes)));
+
+ // Check regex is not matched
+ regex = "abc";
+ SingerUtils.setHostname("localhost-dev.19970722", regex);
+ prefixes = new String []{"localhost-dev.19970722"};
+ finalPrefixes = SingerUtils.getHostnamePrefixes(regex);
+ assertTrue(finalPrefixes.equals(java.util.Arrays.asList(prefixes)));
+
+ // reset hostname
+ SingerUtils.setHostname(SingerUtils.getHostname(), "-");
+ }
+
}
diff --git a/thrift-logger/pom.xml b/thrift-logger/pom.xml
index 738b32b7..3fc3e210 100644
--- a/thrift-logger/pom.xml
+++ b/thrift-logger/pom.xml
@@ -4,7 +4,7 @@
com.pinterest.singer
singer-package
- 0.8.0.79
+ 0.8.0.80
../pom.xml
thrift-logger