diff --git a/statsdaemon.go b/statsdaemon.go index a4d24e6..654063a 100644 --- a/statsdaemon.go +++ b/statsdaemon.go @@ -96,7 +96,8 @@ var ( debug = flag.Bool("debug", false, "print statistics sent to graphite") showVersion = flag.Bool("version", false, "print version string") deleteGauges = flag.Bool("delete-gauges", true, "don't send values to graphite for inactive gauges, as opposed to sending the previous value") - persistCountKeys = flag.Int64("persist-count-keys", 60, "number of flush-intervals to persist count keys") + persistCountKeys = flag.Uint("persist-count-keys", 60, "number of flush-intervals to persist count keys (at zero)") + persistTimerKeys = flag.Uint("persist-timer-counts", 0, "number of flush-intervals to persist timer count keys (at zero)") receiveCounter = flag.String("receive-counter", "", "Metric name for total metrics received per interval") percentThreshold = Percentiles{} prefix = flag.String("prefix", "", "Prefix for all stats") @@ -109,12 +110,14 @@ func init() { } var ( + receiveCount uint64 In = make(chan *Packet, MAX_UNPROCESSED_PACKETS) counters = make(map[string]float64) gauges = make(map[string]float64) timers = make(map[string]Float64Slice) - countInactivity = make(map[string]int64) sets = make(map[string][]string) + inactivCounters = make(map[string]uint) + inactivTimers = make(map[string]uint) ) func monitor() { @@ -139,55 +142,31 @@ func monitor() { } func packetHandler(s *Packet) { - if *receiveCounter != "" { - v, ok := counters[*receiveCounter] - if !ok || v < 0 { - counters[*receiveCounter] = 0 - } - counters[*receiveCounter] += 1 - } + receiveCount++ switch s.Modifier { case "ms": - _, ok := timers[s.Bucket] - if !ok { - var t Float64Slice - timers[s.Bucket] = t + vals := timers[s.Bucket] + if vals == nil { + vals = make([]float64, 1, 5) + vals[0] = 0.0 } - timers[s.Bucket] = append(timers[s.Bucket], s.ValFlt) + // first slot is sampled count, following are times + vals[0] += float64(1 / s.Sampling) + timers[s.Bucket] = append(vals, s.ValFlt) case "g": - gaugeValue, _ := gauges[s.Bucket] - + var gaugeValue float64 if s.ValStr == "" { gaugeValue = s.ValFlt } else if s.ValStr == "+" { - // watch out for overflows - if s.ValFlt > (math.MaxFloat64 - gaugeValue) { - gaugeValue = math.MaxFloat64 - } else { - gaugeValue += s.ValFlt - } + gaugeValue = gauges[s.Bucket] + s.ValFlt } else if s.ValStr == "-" { - // subtract checking for negative numbers - if s.ValFlt > gaugeValue { - gaugeValue = 0 - } else { - gaugeValue -= s.ValFlt - } + gaugeValue = gauges[s.Bucket] - s.ValFlt } - gauges[s.Bucket] = gaugeValue case "c": - _, ok := counters[s.Bucket] - if !ok { - counters[s.Bucket] = 0 - } - counters[s.Bucket] += s.ValFlt * float64(1/s.Sampling) + counters[s.Bucket] += s.ValFlt / float64(s.Sampling) case "s": - _, ok := sets[s.Bucket] - if !ok { - sets[s.Bucket] = make([]string, 0) - } sets[s.Bucket] = append(sets[s.Bucket], s.ValStr) } } @@ -251,21 +230,38 @@ func submit(deadline time.Time) error { func processCounters(buffer *bytes.Buffer, now int64) int64 { var num int64 - // continue sending zeros for counters for a short period of time even if we have no new data + persist := *persistCountKeys + + // avoid adding prefix/postfix to receiveCounter + if *receiveCounter != "" && receiveCount > 0 { + fmt.Fprintf(buffer, "%s %d %d\n", *receiveCounter, receiveCount, now) + if persist > 0 { + inactivCounters[*receiveCounter] = 0 + } + num++ + } + receiveCount = 0 + for bucket, value := range counters { - fmt.Fprintf(buffer, "%s %s %d\n", bucket, strconv.FormatFloat(value, 'f', -1, 64), now) + fullbucket := *prefix + bucket + *postfix + fmt.Fprintf(buffer, "%s %s %d\n", fullbucket, strconv.FormatFloat(value, 'f', -1, 64), now) delete(counters, bucket) - countInactivity[bucket] = 0 + if persist > 0 { + inactivCounters[fullbucket] = 0 + } num++ } - for bucket, purgeCount := range countInactivity { + + // continue sending zeros for no-longer-active counters for configured flush-intervals + for bucket, purgeCount := range inactivCounters { if purgeCount > 0 { fmt.Fprintf(buffer, "%s 0 %d\n", bucket, now) num++ } - countInactivity[bucket] += 1 - if countInactivity[bucket] > *persistCountKeys { - delete(countInactivity, bucket) + if purgeCount >= persist { + delete(inactivCounters, bucket) + } else { + inactivCounters[bucket] = purgeCount + 1 } } return num @@ -275,7 +271,8 @@ func processGauges(buffer *bytes.Buffer, now int64) int64 { var num int64 for bucket, currentValue := range gauges { - fmt.Fprintf(buffer, "%s %s %d\n", bucket, strconv.FormatFloat(currentValue, 'f', -1, 64), now) + valstr := strconv.FormatFloat(currentValue, 'f', -1, 64) + fmt.Fprintf(buffer, "%s%s%s %s %d\n", *prefix, bucket, *postfix, valstr, now) num++ if *deleteGauges { delete(gauges, bucket) @@ -293,7 +290,7 @@ func processSets(buffer *bytes.Buffer, now int64) int64 { uniqueSet[str] = true } - fmt.Fprintf(buffer, "%s %d %d\n", bucket, len(uniqueSet), now) + fmt.Fprintf(buffer, "%s%s%s %d %d\n", *prefix, bucket, *postfix, len(uniqueSet), now) delete(sets, bucket) } return num @@ -301,9 +298,12 @@ func processSets(buffer *bytes.Buffer, now int64) int64 { func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 { var num int64 + persist := *persistTimerKeys + for bucket, timer := range timers { - bucketWithoutPostfix := bucket[:len(bucket)-len(*postfix)] num++ + sampled := timer[0] + timer = timer[1:] sort.Sort(timer) min := timer[0] @@ -334,29 +334,45 @@ func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 { maxAtThreshold = timer[indexOfPerc] } - var tmpl string - var pctstr string - if pct.float >= 0 { - tmpl = "%s.upper_%s%s %s %d\n" - pctstr = pct.str - } else { - tmpl = "%s.lower_%s%s %s %d\n" + ptype := "upper" + pctstr := pct.str + if pct.float < 0 { + ptype = "lower" pctstr = pct.str[1:] } threshold_s := strconv.FormatFloat(maxAtThreshold, 'f', -1, 64) - fmt.Fprintf(buffer, tmpl, bucketWithoutPostfix, pctstr, *postfix, threshold_s, now) + fmt.Fprintf(buffer, "%s%s.%s_%s%s %s %d\n", + *prefix, bucket, ptype, pctstr, *postfix, threshold_s, now) } mean_s := strconv.FormatFloat(mean, 'f', -1, 64) max_s := strconv.FormatFloat(max, 'f', -1, 64) min_s := strconv.FormatFloat(min, 'f', -1, 64) + count_s := strconv.FormatFloat(sampled, 'f', -1, 64) - fmt.Fprintf(buffer, "%s.mean%s %s %d\n", bucketWithoutPostfix, *postfix, mean_s, now) - fmt.Fprintf(buffer, "%s.upper%s %s %d\n", bucketWithoutPostfix, *postfix, max_s, now) - fmt.Fprintf(buffer, "%s.lower%s %s %d\n", bucketWithoutPostfix, *postfix, min_s, now) - fmt.Fprintf(buffer, "%s.count%s %d %d\n", bucketWithoutPostfix, *postfix, count, now) + fmt.Fprintf(buffer, "%s%s.mean%s %s %d\n", *prefix, bucket, *postfix, mean_s, now) + fmt.Fprintf(buffer, "%s%s.upper%s %s %d\n", *prefix, bucket, *postfix, max_s, now) + fmt.Fprintf(buffer, "%s%s.lower%s %s %d\n", *prefix, bucket, *postfix, min_s, now) + fmt.Fprintf(buffer, "%s%s.count%s %s %d\n", *prefix, bucket, *postfix, count_s, now) delete(timers, bucket) + if persist > 0 { + countKey := fmt.Sprintf("%s%s.count%s", *prefix, bucket, *postfix) + inactivTimers[countKey] = 0 + } + } + + // continue sending zeros for no-longer-active timer counts for configured flush-intervals + for bucket, purgeCount := range inactivTimers { + if purgeCount > 0 { + fmt.Fprintf(buffer, "%s 0 %d\n", bucket, now) + num++ + } + if purgeCount >= persist { + delete(inactivTimers, bucket) + } else { + inactivTimers[bucket] = purgeCount + 1 + } } return num } @@ -513,7 +529,7 @@ func parseLine(line []byte) *Packet { } return &Packet{ - Bucket: *prefix + sanitizeBucket(name) + *postfix, + Bucket: sanitizeBucket(name), ValFlt: floatval, ValStr: strval, Modifier: typeCode, @@ -579,11 +595,11 @@ func main() { fmt.Printf("statsdaemon v%s (built w/%s)\n", VERSION, runtime.Version()) return } - *prefix = sanitizeBucket([]byte(*prefix)) - *postfix = sanitizeBucket([]byte(*postfix)) + flag.Set("prefix", sanitizeBucket([]byte(*prefix))) + flag.Set("postfix", sanitizeBucket([]byte(*postfix))) signalchan = make(chan os.Signal, 1) - signal.Notify(signalchan, syscall.SIGTERM) + signal.Notify(signalchan, syscall.SIGTERM, syscall.SIGINT) go udpListener() if *tcpServiceAddress != "" { diff --git a/statsdaemon_test.go b/statsdaemon_test.go index a657a9b..adc08a9 100644 --- a/statsdaemon_test.go +++ b/statsdaemon_test.go @@ -202,21 +202,15 @@ func TestParseLineMisc(t *testing.T) { assert.Equal(t, float32(1), packet.Sampling) flag.Set("prefix", "test.") + flag.Set("postfix", ".test") d = []byte("prefix:4|c") packet = parseLine(d) - assert.Equal(t, "test.prefix", packet.Bucket) + // prefix/postfix not in Bucket/key, added in processCounters() etc + assert.Equal(t, "prefix", packet.Bucket) assert.Equal(t, float64(4), packet.ValFlt) assert.Equal(t, "c", packet.Modifier) assert.Equal(t, float32(1), packet.Sampling) flag.Set("prefix", "") - - flag.Set("postfix", ".test") - d = []byte("postfix:4|c") - packet = parseLine(d) - assert.Equal(t, "postfix.test", packet.Bucket) - assert.Equal(t, float64(4), packet.ValFlt) - assert.Equal(t, "c", packet.Modifier) - assert.Equal(t, float32(1), packet.Sampling) flag.Set("postfix", "") d = []byte("a.key.with-0.dash:4|c\ngauge:3|g") @@ -367,8 +361,9 @@ func TestMultiTcp(t *testing.T) { } func TestPacketHandlerReceiveCounter(t *testing.T) { + flag.Set("receive-counter", "countme") + receiveCount = 0 counters = make(map[string]float64) - *receiveCounter = "countme" p := &Packet{ Bucket: "gorets", @@ -377,10 +372,11 @@ func TestPacketHandlerReceiveCounter(t *testing.T) { Sampling: float32(1), } packetHandler(p) - assert.Equal(t, counters["countme"], float64(1)) - + assert.Equal(t, receiveCount, uint64(1)) packetHandler(p) - assert.Equal(t, counters["countme"], float64(2)) + assert.Equal(t, receiveCount, uint64(2)) + + flag.Set("receive-counter", "") } func TestPacketHandlerCount(t *testing.T) { @@ -433,14 +429,14 @@ func TestPacketHandlerGauge(t *testing.T) { packetHandler(p) assert.Equal(t, gauges["gaugor"], float64(327)) - // <0 overflow + // going negative p.ValFlt = 10 p.ValStr = "" packetHandler(p) p.ValFlt = 20 p.ValStr = "-" packetHandler(p) - assert.Equal(t, gauges["gaugor"], float64(0)) + assert.Equal(t, gauges["gaugor"], float64(-10)) // >MaxFloat64 overflow p.ValFlt = float64(math.MaxFloat64 - 10) @@ -462,13 +458,15 @@ func TestPacketHandlerTimer(t *testing.T) { Sampling: float32(1), } packetHandler(p) - assert.Equal(t, len(timers["glork"]), 1) - assert.Equal(t, timers["glork"][0], float64(320)) + assert.Equal(t, len(timers["glork"]), 2) + assert.Equal(t, timers["glork"][0], float64(1)) + assert.Equal(t, timers["glork"][1], float64(320)) p.ValFlt = float64(100) packetHandler(p) - assert.Equal(t, len(timers["glork"]), 2) - assert.Equal(t, timers["glork"][1], float64(100)) + assert.Equal(t, len(timers["glork"]), 3) + assert.Equal(t, timers["glork"][0], float64(2)) + assert.Equal(t, timers["glork"][2], float64(100)) } func TestPacketHandlerSet(t *testing.T) { @@ -491,8 +489,8 @@ func TestPacketHandlerSet(t *testing.T) { } func TestProcessCounters(t *testing.T) { - - *persistCountKeys = int64(10) + flag.Set("persist-count-keys", "10") + receiveCount = 0 counters = make(map[string]float64) var buffer bytes.Buffer now := int64(1418052649) @@ -504,7 +502,7 @@ func TestProcessCounters(t *testing.T) { assert.Equal(t, buffer.String(), "gorets 123 1418052649\n") // run processCounters() enough times to make sure it purges items - for i := 0; i < int(*persistCountKeys)+10; i++ { + for i := 0; i < int(*persistCountKeys); i++ { num = processCounters(&buffer, now) } lines := bytes.Split(buffer.Bytes(), []byte("\n")) @@ -515,10 +513,41 @@ func TestProcessCounters(t *testing.T) { assert.Equal(t, string(lines[*persistCountKeys]), "gorets 0 1418052649") } +func TestProcessCountersPrefix(t *testing.T) { + counters = make(map[string]float64) + var buffer bytes.Buffer + now := int64(1418052649) + + flag.Set("persist-count-keys", "2") + flag.Set("prefix", "pre.") + flag.Set("postfix", ".post") + + counters["gorets"] = float64(123) + num := processCounters(&buffer, now) + firstOutput := buffer.String() + // run processCounters() enough times to make sure it purges items + for i := 0; i < int(*persistCountKeys); i++ { + processCounters(&buffer, now) + } + + // set back flags before asserting + flag.Set("persist-count-keys", "60") + flag.Set("prefix", "") + flag.Set("postfix", "") + + assert.Equal(t, num, int64(1)) + assert.Equal(t, firstOutput, "pre.gorets.post 123 1418052649\n") + + lines := bytes.Split(buffer.Bytes(), []byte("\n")) + assert.Equal(t, string(lines[0]), "pre.gorets.post 123 1418052649") + assert.Equal(t, string(lines[1]), "pre.gorets.post 0 1418052649") + assert.Equal(t, string(lines[2]), "pre.gorets.post 0 1418052649") +} + func TestProcessTimers(t *testing.T) { // Some data with expected mean of 20 timers = make(map[string]Float64Slice) - timers["response_time"] = []float64{0, 30, 30} + timers["response_time"] = []float64{3, 0, 30, 30} now := int64(1418052649) @@ -625,7 +654,7 @@ func TestProcessSets(t *testing.T) { func TestProcessTimersUpperPercentile(t *testing.T) { // Some data with expected 75% of 2 timers = make(map[string]Float64Slice) - timers["response_time"] = []float64{0, 1, 2, 3} + timers["response_time"] = []float64{4, 0, 1, 2, 3} now := int64(1418052649) @@ -644,11 +673,11 @@ func TestProcessTimersUpperPercentile(t *testing.T) { } func TestProcessTimersUpperPercentilePostfix(t *testing.T) { + flag.Set("prefix", "pfx.") flag.Set("postfix", ".test") // Some data with expected 75% of 2 timers = make(map[string]Float64Slice) - timers["postfix_response_time.test"] = []float64{0, 1, 2, 3} - + timers["postfix_response_time"] = []float64{4, 0, 1, 2, 3} now := int64(1418052649) var buffer bytes.Buffer @@ -658,17 +687,19 @@ func TestProcessTimersUpperPercentilePostfix(t *testing.T) { "75", }, }) - lines := bytes.Split(buffer.Bytes(), []byte("\n")) - assert.Equal(t, num, int64(1)) - assert.Equal(t, string(lines[0]), "postfix_response_time.upper_75.test 2 1418052649") + // set flags back before asserting + flag.Set("prefix", "") flag.Set("postfix", "") + + assert.Equal(t, num, int64(1)) + assert.Equal(t, string(lines[0]), "pfx.postfix_response_time.upper_75.test 2 1418052649") } func TestProcessTimesLowerPercentile(t *testing.T) { timers = make(map[string]Float64Slice) - timers["time"] = []float64{0, 1, 2, 3} + timers["time"] = []float64{4, 0, 1, 2, 3} now := int64(1418052649) @@ -740,7 +771,11 @@ func TestMultipleUDPSends(t *testing.T) { } func BenchmarkManyDifferentSensors(t *testing.B) { + counters = make(map[string]float64) + gauges = make(map[string]float64) + timers = make(map[string]Float64Slice) r := rand.New(rand.NewSource(438)) + for i := 0; i < 1000; i++ { bucket := "response_time" + strconv.Itoa(i) for i := 0; i < 10000; i++ {