Skip to content

Commit

Permalink
Created input stats endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidad committed Feb 19, 2015
1 parent 31c2ec9 commit 83bfce8
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 17 deletions.
1 change: 1 addition & 0 deletions suro-core/src/main/java/com/netflix/suro/TagKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public class TagKey {
public static final String RETRIED_COUNT = "retriedCount";
public static final String ROUTING_KEY = "routingKey";
public static final String REJECTED_REASON = "rejectedReason";
public static final String INPUT = "suro.input";
}
2 changes: 2 additions & 0 deletions suro-core/src/main/java/com/netflix/suro/input/SuroInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ public interface SuroInput {
void shutdown();

void setPause(long ms);

String getStat();
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public void setPause(long ms) {
pausedTime.addAndGet(ms);
}

@Override
public String getStat() {
return "n/a";
}

private void stop() {
running = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ public void setPause(long ms) {
pausedTime.addAndGet(ms);
}

@Override
public String getStat() {
return "n/a";
}

public static TypeReference<Map<String, Object>> typeReference = new TypeReference<Map<String, Object>>() {};
private static final int retryCount = 5;
private static final int sleepOnS3Exception = 5000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,13 @@ public void shutdown() {
input.shutdown();
}
}

public String reportInputStat() {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, SuroInput> entry : inputMap.entrySet()) {
sb.append(entry.getKey()).append(':').append(entry.getValue().getStat()).append("\n\n");
}

return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package com.netflix.suro.input.thrift;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.netflix.governator.guice.lazy.LazySingleton;
import com.netflix.servo.DefaultMonitorRegistry;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.DynamicCounter;
import com.netflix.servo.monitor.BasicCounter;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.monitor.Monitors;
import com.netflix.suro.ClientConfig;
Expand All @@ -33,6 +35,7 @@
import com.netflix.suro.message.MessageSetReader;
import com.netflix.suro.queue.Queue4Server;
import com.netflix.suro.routing.MessageRouter;
import com.netflix.suro.servo.Servo;
import com.netflix.suro.thrift.*;
import org.apache.thrift.TException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -127,9 +130,12 @@ public Result process(TMessageSet messageSet) throws TException {
try {
// Stop adding chunks if it's no running
if ( !isRunning) {
DynamicCounter.increment(rejectedMessageCountMetrics,
TagKey.APP, messageSet.getApp(),
TagKey.REJECTED_REASON, "SURO_STOPPED");
Servo.getCounter(
MonitorConfig.builder(rejectedMessageCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.withTag(TagKey.INPUT, this.input.getId())
.withTag(TagKey.REJECTED_REASON, "SURO_STOPPED")
.build()).increment();

log.warn("Message processor is not running. Message rejected");
result.setMessage("Suro server stopped");
Expand All @@ -138,9 +144,12 @@ public Result process(TMessageSet messageSet) throws TException {
}

if ( !isTakingTraffic ) {
DynamicCounter.increment(rejectedMessageCountMetrics,
TagKey.APP, messageSet.getApp(),
TagKey.REJECTED_REASON, "SURO_THROTTLING");
Servo.getCounter(
MonitorConfig.builder(rejectedMessageCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.withTag(TagKey.INPUT, this.input.getId())
.withTag(TagKey.REJECTED_REASON, "SURO_THROTTLING")
.build()).increment();

log.warn("Suro is not taking traffic. Message rejected. ");
result.setMessage("Suro server is not taking traffic");
Expand All @@ -150,24 +159,32 @@ public Result process(TMessageSet messageSet) throws TException {

MessageSetReader reader = new MessageSetReader(messageSet);
if (!reader.checkCRC()) {
DynamicCounter.increment(dataCorruptionCountMetrics, TagKey.APP, messageSet.getApp());
Servo.getCounter(
MonitorConfig.builder(dataCorruptionCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.withTag(TagKey.INPUT, this.input.getId())
.build()).increment();

result.setMessage("data corrupted");
result.setResultCode(ResultCode.CRC_CORRUPTED);
return result;
}

if (queue.offer(messageSet)) {
DynamicCounter.increment(
MonitorConfig.builder(messageCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.build(),
messageSet.getNumMessages());
Servo.getCounter(
MonitorConfig.builder(messageCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.withTag(TagKey.INPUT, this.input.getId())
.build()).increment();

result.setMessage(Long.toString(messageSet.getCrc()));
result.setResultCode(ResultCode.OK);
} else {
DynamicCounter.increment(retryCountMetrics, TagKey.APP, messageSet.getApp());
Servo.getCounter(
MonitorConfig.builder(retryCountMetrics)
.withTag(TagKey.APP, messageSet.getApp())
.withTag(TagKey.INPUT, this.input.getId())
.build()).increment();

result.setMessage(Long.toString(messageSet.getCrc()));
result.setResultCode(ResultCode.QUEUE_FULL);
Expand Down Expand Up @@ -234,9 +251,12 @@ private void processMessageSet(TMessageSet tMessageSet) {
try {
router.process(input, new DefaultMessageContainer(message, jsonMapper));
} catch (Exception e) {
DynamicCounter.increment(messageProcessErrorMetrics,
TagKey.APP, tMessageSet.getApp(),
TagKey.DATA_SOURCE, message.getRoutingKey());
Servo.getCounter(
MonitorConfig.builder(messageProcessErrorMetrics)
.withTag(TagKey.APP, tMessageSet.getApp())
.withTag(TagKey.DATA_SOURCE, message.getRoutingKey())
.withTag(TagKey.INPUT, this.input.getId())
.build()).increment();

log.error(String.format("Failed to process message %s: %s", message, e.getMessage()), e);
}
Expand Down Expand Up @@ -276,4 +296,74 @@ public TMessageSet poll(long timeout, TimeUnit unit) {
public void setInput(SuroInput input) {
this.input = input;
}

public String getStat() {
StringBuilder sb = new StringBuilder();

StringBuilder messageCount = new StringBuilder();
StringBuilder retryCount = new StringBuilder();
StringBuilder dataCorruptionCount = new StringBuilder();
StringBuilder rejectedMessageCount = new StringBuilder();
StringBuilder messageProcessError = new StringBuilder();

/*
private static final String messageCountMetrics = "messageCount";
private static final String retryCountMetrics = "retryCount";
private static final String dataCorruptionCountMetrics = "corruptedMessageCount";
private static final String rejectedMessageCountMetrics = "rejectedMessageCount";
private static final String messageProcessErrorMetrics = "processErrorCount";
*/

for (com.netflix.servo.monitor.Monitor<?> m : DefaultMonitorRegistry.getInstance().getRegisteredMonitors()) {
log.debug("Got monitor of type {}", m);

if(m instanceof BasicCounter) {
BasicCounter counter = (BasicCounter)m;
String inputId = counter.getConfig().getTags().getValue(TagKey.INPUT);
if(!Strings.isNullOrEmpty(inputId) && inputId.equals(input.getId())){
if (counter.getConfig().getName().equals(messageCountMetrics)) {
messageCount
.append(counter.getConfig().getTags().getValue(TagKey.APP))
.append(":")
.append(counter.getValue()).append('\n');
}
if (counter.getConfig().getName().equals(retryCountMetrics)) {
retryCount
.append(counter.getConfig().getTags().getValue(TagKey.APP))
.append(":")
.append(counter.getValue()).append('\n');
}
if (counter.getConfig().getName().equals(dataCorruptionCountMetrics)) {
dataCorruptionCount
.append(counter.getConfig().getTags().getValue(TagKey.APP))
.append(":")
.append(counter.getValue()).append('\n');
}
if (counter.getConfig().getName().equals(rejectedMessageCountMetrics)) {
rejectedMessageCount
.append(counter.getConfig().getTags().getValue(TagKey.APP))
.append(":")
.append(counter.getConfig().getTags().getValue(TagKey.REJECTED_REASON))
.append(":")
.append(counter.getValue()).append('\n');
}
if (counter.getConfig().getName().equals(messageProcessErrorMetrics)) {
messageProcessError
.append(counter.getConfig().getTags().getValue(TagKey.APP))
.append(":")
.append(counter.getConfig().getTags().getValue(TagKey.DATA_SOURCE))
.append(":")
.append(counter.getValue()).append('\n');
}
}
}
}

sb.append('\n').append(messageCountMetrics).append('\n').append(messageCount.toString());
sb.append('\n').append(retryCountMetrics).append('\n').append(retryCount.toString());
sb.append('\n').append(dataCorruptionCountMetrics).append('\n').append(dataCorruptionCount.toString());
sb.append('\n').append(rejectedMessageCountMetrics).append('\n').append(rejectedMessageCount.toString());
sb.append('\n').append(messageProcessErrorMetrics).append('\n').append(messageProcessError.toString());
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public void run() {
}
}

@Override
public String getStat() {
return msgProcessor.getStat();
}

// for testing purpose
public int getPort() {
return port;
Expand Down
29 changes: 29 additions & 0 deletions suro-server/src/main/java/com/netflix/suro/server/InputStat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.netflix.suro.server;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.suro.input.InputManager;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

/**
* Created by adamschmidt on 18/02/15.
*/
@Singleton
@Path("/suroinputstat")
public class InputStat {
private final InputManager inputManager;

@Inject
public InputStat(InputManager inputManager) {
this.inputManager = inputManager;
}

@GET
@Produces("text/plain")
public String get() {
return inputManager.reportInputStat();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public static ServletModule createJerseyServletModule() {
protected void configureServlets() {
bind(HealthCheck.class);
bind(SinkStat.class);
bind(InputStat.class);
bind(GuiceContainer.class).asEagerSingleton();
serve("/*").with(GuiceContainer.class);
}
Expand Down

0 comments on commit 83bfce8

Please sign in to comment.