From 83bfce81f8d8026e227434668ee1de3f1e2c3491 Mon Sep 17 00:00:00 2001 From: schmidad Date: Wed, 18 Feb 2015 20:46:09 +1100 Subject: [PATCH] Created input stats endpoint --- .../main/java/com/netflix/suro/TagKey.java | 1 + .../com/netflix/suro/input/SuroInput.java | 2 + .../suro/input/kafka/KafkaConsumer.java | 5 + .../suro/input/remotefile/S3Consumer.java | 5 + .../com/netflix/suro/input/InputManager.java | 9 ++ .../input/thrift/MessageSetProcessor.java | 124 +++++++++++++++--- .../suro/input/thrift/ThriftServer.java | 5 + .../com/netflix/suro/server/InputStat.java | 29 ++++ .../com/netflix/suro/server/StatusServer.java | 1 + 9 files changed, 164 insertions(+), 17 deletions(-) create mode 100644 suro-server/src/main/java/com/netflix/suro/server/InputStat.java diff --git a/suro-core/src/main/java/com/netflix/suro/TagKey.java b/suro-core/src/main/java/com/netflix/suro/TagKey.java index 47aa35ba..598e0d08 100644 --- a/suro-core/src/main/java/com/netflix/suro/TagKey.java +++ b/suro-core/src/main/java/com/netflix/suro/TagKey.java @@ -27,4 +27,5 @@ public class TagKey { public static final String RETRIED_COUNT = "retriedCount"; public static final String ROUTING_KEY = "routingKey"; public static final String REJECTED_REASON = "rejectedReason"; + public static final String INPUT = "suro.input"; } diff --git a/suro-core/src/main/java/com/netflix/suro/input/SuroInput.java b/suro-core/src/main/java/com/netflix/suro/input/SuroInput.java index d6bc07fe..e07cceba 100644 --- a/suro-core/src/main/java/com/netflix/suro/input/SuroInput.java +++ b/suro-core/src/main/java/com/netflix/suro/input/SuroInput.java @@ -9,4 +9,6 @@ public interface SuroInput { void shutdown(); void setPause(long ms); + + String getStat(); } diff --git a/suro-kafka-consumer/src/main/java/com/netflix/suro/input/kafka/KafkaConsumer.java b/suro-kafka-consumer/src/main/java/com/netflix/suro/input/kafka/KafkaConsumer.java index c3743a6c..5ac6f7cb 100644 --- a/suro-kafka-consumer/src/main/java/com/netflix/suro/input/kafka/KafkaConsumer.java +++ b/suro-kafka-consumer/src/main/java/com/netflix/suro/input/kafka/KafkaConsumer.java @@ -127,6 +127,11 @@ public void setPause(long ms) { pausedTime.addAndGet(ms); } + @Override + public String getStat() { + return "n/a"; + } + private void stop() { running = false; try { diff --git a/suro-s3/src/main/java/com/netflix/suro/input/remotefile/S3Consumer.java b/suro-s3/src/main/java/com/netflix/suro/input/remotefile/S3Consumer.java index 8580eb6d..73fb4ed1 100644 --- a/suro-s3/src/main/java/com/netflix/suro/input/remotefile/S3Consumer.java +++ b/suro-s3/src/main/java/com/netflix/suro/input/remotefile/S3Consumer.java @@ -193,6 +193,11 @@ public void setPause(long ms) { pausedTime.addAndGet(ms); } + @Override + public String getStat() { + return "n/a"; + } + public static TypeReference> typeReference = new TypeReference>() {}; private static final int retryCount = 5; private static final int sleepOnS3Exception = 5000; diff --git a/suro-server/src/main/java/com/netflix/suro/input/InputManager.java b/suro-server/src/main/java/com/netflix/suro/input/InputManager.java index b9e375db..79421943 100644 --- a/suro-server/src/main/java/com/netflix/suro/input/InputManager.java +++ b/suro-server/src/main/java/com/netflix/suro/input/InputManager.java @@ -64,4 +64,13 @@ public void shutdown() { input.shutdown(); } } + + public String reportInputStat() { + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : inputMap.entrySet()) { + sb.append(entry.getKey()).append(':').append(entry.getValue().getStat()).append("\n\n"); + } + + return sb.toString(); + } } diff --git a/suro-server/src/main/java/com/netflix/suro/input/thrift/MessageSetProcessor.java b/suro-server/src/main/java/com/netflix/suro/input/thrift/MessageSetProcessor.java index fa98c9e8..602a2520 100644 --- a/suro-server/src/main/java/com/netflix/suro/input/thrift/MessageSetProcessor.java +++ b/suro-server/src/main/java/com/netflix/suro/input/thrift/MessageSetProcessor.java @@ -17,11 +17,13 @@ package com.netflix.suro.input.thrift; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; import com.google.inject.Inject; import com.netflix.governator.guice.lazy.LazySingleton; +import com.netflix.servo.DefaultMonitorRegistry; import com.netflix.servo.annotations.DataSourceType; import com.netflix.servo.annotations.Monitor; -import com.netflix.servo.monitor.DynamicCounter; +import com.netflix.servo.monitor.BasicCounter; import com.netflix.servo.monitor.MonitorConfig; import com.netflix.servo.monitor.Monitors; import com.netflix.suro.ClientConfig; @@ -33,6 +35,7 @@ import com.netflix.suro.message.MessageSetReader; import com.netflix.suro.queue.Queue4Server; import com.netflix.suro.routing.MessageRouter; +import com.netflix.suro.servo.Servo; import com.netflix.suro.thrift.*; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -127,9 +130,12 @@ public Result process(TMessageSet messageSet) throws TException { try { // Stop adding chunks if it's no running if ( !isRunning) { - DynamicCounter.increment(rejectedMessageCountMetrics, - TagKey.APP, messageSet.getApp(), - TagKey.REJECTED_REASON, "SURO_STOPPED"); + Servo.getCounter( + MonitorConfig.builder(rejectedMessageCountMetrics) + .withTag(TagKey.APP, messageSet.getApp()) + .withTag(TagKey.INPUT, this.input.getId()) + .withTag(TagKey.REJECTED_REASON, "SURO_STOPPED") + .build()).increment(); log.warn("Message processor is not running. Message rejected"); result.setMessage("Suro server stopped"); @@ -138,9 +144,12 @@ public Result process(TMessageSet messageSet) throws TException { } if ( !isTakingTraffic ) { - DynamicCounter.increment(rejectedMessageCountMetrics, - TagKey.APP, messageSet.getApp(), - TagKey.REJECTED_REASON, "SURO_THROTTLING"); + Servo.getCounter( + MonitorConfig.builder(rejectedMessageCountMetrics) + .withTag(TagKey.APP, messageSet.getApp()) + .withTag(TagKey.INPUT, this.input.getId()) + .withTag(TagKey.REJECTED_REASON, "SURO_THROTTLING") + .build()).increment(); log.warn("Suro is not taking traffic. Message rejected. "); result.setMessage("Suro server is not taking traffic"); @@ -150,7 +159,11 @@ public Result process(TMessageSet messageSet) throws TException { MessageSetReader reader = new MessageSetReader(messageSet); if (!reader.checkCRC()) { - DynamicCounter.increment(dataCorruptionCountMetrics, TagKey.APP, messageSet.getApp()); + Servo.getCounter( + MonitorConfig.builder(dataCorruptionCountMetrics) + .withTag(TagKey.APP, messageSet.getApp()) + .withTag(TagKey.INPUT, this.input.getId()) + .build()).increment(); result.setMessage("data corrupted"); result.setResultCode(ResultCode.CRC_CORRUPTED); @@ -158,16 +171,20 @@ public Result process(TMessageSet messageSet) throws TException { } if (queue.offer(messageSet)) { - DynamicCounter.increment( - MonitorConfig.builder(messageCountMetrics) - .withTag(TagKey.APP, messageSet.getApp()) - .build(), - messageSet.getNumMessages()); + Servo.getCounter( + MonitorConfig.builder(messageCountMetrics) + .withTag(TagKey.APP, messageSet.getApp()) + .withTag(TagKey.INPUT, this.input.getId()) + .build()).increment(); result.setMessage(Long.toString(messageSet.getCrc())); result.setResultCode(ResultCode.OK); } else { - DynamicCounter.increment(retryCountMetrics, TagKey.APP, messageSet.getApp()); + Servo.getCounter( + MonitorConfig.builder(retryCountMetrics) + .withTag(TagKey.APP, messageSet.getApp()) + .withTag(TagKey.INPUT, this.input.getId()) + .build()).increment(); result.setMessage(Long.toString(messageSet.getCrc())); result.setResultCode(ResultCode.QUEUE_FULL); @@ -234,9 +251,12 @@ private void processMessageSet(TMessageSet tMessageSet) { try { router.process(input, new DefaultMessageContainer(message, jsonMapper)); } catch (Exception e) { - DynamicCounter.increment(messageProcessErrorMetrics, - TagKey.APP, tMessageSet.getApp(), - TagKey.DATA_SOURCE, message.getRoutingKey()); + Servo.getCounter( + MonitorConfig.builder(messageProcessErrorMetrics) + .withTag(TagKey.APP, tMessageSet.getApp()) + .withTag(TagKey.DATA_SOURCE, message.getRoutingKey()) + .withTag(TagKey.INPUT, this.input.getId()) + .build()).increment(); log.error(String.format("Failed to process message %s: %s", message, e.getMessage()), e); } @@ -276,4 +296,74 @@ public TMessageSet poll(long timeout, TimeUnit unit) { public void setInput(SuroInput input) { this.input = input; } + + public String getStat() { + StringBuilder sb = new StringBuilder(); + + StringBuilder messageCount = new StringBuilder(); + StringBuilder retryCount = new StringBuilder(); + StringBuilder dataCorruptionCount = new StringBuilder(); + StringBuilder rejectedMessageCount = new StringBuilder(); + StringBuilder messageProcessError = new StringBuilder(); + + /* + private static final String messageCountMetrics = "messageCount"; + private static final String retryCountMetrics = "retryCount"; + private static final String dataCorruptionCountMetrics = "corruptedMessageCount"; + private static final String rejectedMessageCountMetrics = "rejectedMessageCount"; + private static final String messageProcessErrorMetrics = "processErrorCount"; + */ + + for (com.netflix.servo.monitor.Monitor m : DefaultMonitorRegistry.getInstance().getRegisteredMonitors()) { + log.debug("Got monitor of type {}", m); + + if(m instanceof BasicCounter) { + BasicCounter counter = (BasicCounter)m; + String inputId = counter.getConfig().getTags().getValue(TagKey.INPUT); + if(!Strings.isNullOrEmpty(inputId) && inputId.equals(input.getId())){ + if (counter.getConfig().getName().equals(messageCountMetrics)) { + messageCount + .append(counter.getConfig().getTags().getValue(TagKey.APP)) + .append(":") + .append(counter.getValue()).append('\n'); + } + if (counter.getConfig().getName().equals(retryCountMetrics)) { + retryCount + .append(counter.getConfig().getTags().getValue(TagKey.APP)) + .append(":") + .append(counter.getValue()).append('\n'); + } + if (counter.getConfig().getName().equals(dataCorruptionCountMetrics)) { + dataCorruptionCount + .append(counter.getConfig().getTags().getValue(TagKey.APP)) + .append(":") + .append(counter.getValue()).append('\n'); + } + if (counter.getConfig().getName().equals(rejectedMessageCountMetrics)) { + rejectedMessageCount + .append(counter.getConfig().getTags().getValue(TagKey.APP)) + .append(":") + .append(counter.getConfig().getTags().getValue(TagKey.REJECTED_REASON)) + .append(":") + .append(counter.getValue()).append('\n'); + } + if (counter.getConfig().getName().equals(messageProcessErrorMetrics)) { + messageProcessError + .append(counter.getConfig().getTags().getValue(TagKey.APP)) + .append(":") + .append(counter.getConfig().getTags().getValue(TagKey.DATA_SOURCE)) + .append(":") + .append(counter.getValue()).append('\n'); + } + } + } + } + + sb.append('\n').append(messageCountMetrics).append('\n').append(messageCount.toString()); + sb.append('\n').append(retryCountMetrics).append('\n').append(retryCount.toString()); + sb.append('\n').append(dataCorruptionCountMetrics).append('\n').append(dataCorruptionCount.toString()); + sb.append('\n').append(rejectedMessageCountMetrics).append('\n').append(rejectedMessageCount.toString()); + sb.append('\n').append(messageProcessErrorMetrics).append('\n').append(messageProcessError.toString()); + return sb.toString(); + } } diff --git a/suro-server/src/main/java/com/netflix/suro/input/thrift/ThriftServer.java b/suro-server/src/main/java/com/netflix/suro/input/thrift/ThriftServer.java index 9ddb8aee..a3c5928b 100644 --- a/suro-server/src/main/java/com/netflix/suro/input/thrift/ThriftServer.java +++ b/suro-server/src/main/java/com/netflix/suro/input/thrift/ThriftServer.java @@ -156,6 +156,11 @@ public void run() { } } + @Override + public String getStat() { + return msgProcessor.getStat(); + } + // for testing purpose public int getPort() { return port; diff --git a/suro-server/src/main/java/com/netflix/suro/server/InputStat.java b/suro-server/src/main/java/com/netflix/suro/server/InputStat.java new file mode 100644 index 00000000..ccd5d676 --- /dev/null +++ b/suro-server/src/main/java/com/netflix/suro/server/InputStat.java @@ -0,0 +1,29 @@ +package com.netflix.suro.server; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.netflix.suro.input.InputManager; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; + +/** + * Created by adamschmidt on 18/02/15. + */ +@Singleton +@Path("/suroinputstat") +public class InputStat { + private final InputManager inputManager; + + @Inject + public InputStat(InputManager inputManager) { + this.inputManager = inputManager; + } + + @GET + @Produces("text/plain") + public String get() { + return inputManager.reportInputStat(); + } +} diff --git a/suro-server/src/main/java/com/netflix/suro/server/StatusServer.java b/suro-server/src/main/java/com/netflix/suro/server/StatusServer.java index 35703a4a..61e176de 100644 --- a/suro-server/src/main/java/com/netflix/suro/server/StatusServer.java +++ b/suro-server/src/main/java/com/netflix/suro/server/StatusServer.java @@ -49,6 +49,7 @@ public static ServletModule createJerseyServletModule() { protected void configureServlets() { bind(HealthCheck.class); bind(SinkStat.class); + bind(InputStat.class); bind(GuiceContainer.class).asEagerSingleton(); serve("/*").with(GuiceContainer.class); }