From fa3fefb1a44ec80d0da8d50ae5b362c2df91515c Mon Sep 17 00:00:00 2001 From: Marcio Zacarias Date: Thu, 6 Aug 2020 11:41:17 -0400 Subject: [PATCH] Clean up old metrics using metrics timestamp --- cmd/prom-aggregation-gateway/main.go | 59 +++++++-- cmd/prom-aggregation-gateway/main_test.go | 140 +++++++++++----------- 2 files changed, 125 insertions(+), 74 deletions(-) diff --git a/cmd/prom-aggregation-gateway/main.go b/cmd/prom-aggregation-gateway/main.go index 35da3e3..56c955c 100644 --- a/cmd/prom-aggregation-gateway/main.go +++ b/cmd/prom-aggregation-gateway/main.go @@ -8,6 +8,7 @@ import ( "net/http" "sort" "sync" + "time" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" @@ -78,7 +79,18 @@ func mergeBuckets(a, b []*dto.Bucket) []*dto.Bucket { return output } +func makeTimestampMs() int64 { + return time.Now().UnixNano() / int64(time.Millisecond) +} + func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric { + // Getting the metric timestamp or creating one when that metric is merged + // It will be used to cleanup old metrics that have not been merged lately + metricTimestamp := b.GetTimestampMs() + if metricTimestamp == 0 { + metricTimestamp = makeTimestampMs() + } + switch ty { case dto.MetricType_COUNTER: return &dto.Metric{ @@ -86,6 +98,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric { Counter: &dto.Counter{ Value: float64ptr(*a.Counter.Value + *b.Counter.Value), }, + TimestampMs: &metricTimestamp, } case dto.MetricType_GAUGE: @@ -97,6 +110,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric { Gauge: &dto.Gauge{ Value: float64ptr(*a.Gauge.Value + *b.Gauge.Value), }, + TimestampMs: &metricTimestamp, } case dto.MetricType_HISTOGRAM: @@ -107,6 +121,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric { SampleSum: float64ptr(*a.Histogram.SampleSum + *b.Histogram.SampleSum), Bucket: mergeBuckets(a.Histogram.Bucket, b.Histogram.Bucket), }, + TimestampMs: &metricTimestamp, } case dto.MetricType_UNTYPED: @@ -115,6 +130,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric { Untyped: &dto.Untyped{ Value: float64ptr(*a.Untyped.Value + *b.Untyped.Value), }, + TimestampMs: &metricTimestamp, } case dto.MetricType_SUMMARY: @@ -164,13 +180,15 @@ func mergeFamily(a, b *dto.MetricFamily) (*dto.MetricFamily, error) { } type aggate struct { + timeToLiveMs int64 familiesLock sync.RWMutex families map[string]*dto.MetricFamily } -func newAggate() *aggate { +func newAggate(ttl int64) *aggate { return &aggate{ - families: map[string]*dto.MetricFamily{}, + timeToLiveMs: ttl, + families: map[string]*dto.MetricFamily{}, } } @@ -196,6 +214,21 @@ func validateFamily(f *dto.MetricFamily) error { return nil } +func cleanupFamily(metrics []*dto.Metric, ttl int64) []*dto.Metric { + // CurrentTS for old metrics check + nowTS := makeTimestampMs() + + // Iterating over metrics and filtering out the old, not recently merged ones + var updatedMetrics []*dto.Metric + for _, metric := range metrics { + if nowTS-metric.GetTimestampMs() <= ttl { + updatedMetrics = append(updatedMetrics, metric) + } + } + + return updatedMetrics +} + func (a *aggate) parseAndMerge(r io.Reader) error { var parser expfmt.TextParser inFamilies, err := parser.TextToMetricFamilies(r) @@ -240,11 +273,20 @@ func (a *aggate) handler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", string(contentType)) enc := expfmt.NewEncoder(w, contentType) - a.familiesLock.RLock() - defer a.familiesLock.RUnlock() + a.familiesLock.Lock() + defer a.familiesLock.Unlock() metricNames := []string{} for name := range a.families { - metricNames = append(metricNames, name) + // Cleaning up metrics that have not been merged for a while + a.families[name].Metric = cleanupFamily(a.families[name].GetMetric(), a.timeToLiveMs) + + // Including only families that still have metrics to be scraped + if len(a.families[name].Metric) > 0 { + metricNames = append(metricNames, name) + } else { + // Remove the empty families + delete(a.families, name) + } } sort.Sort(sort.StringSlice(metricNames)) @@ -259,12 +301,15 @@ func (a *aggate) handler(w http.ResponseWriter, r *http.Request) { } func main() { - listen := flag.String("listen", ":80", "Address and port to listen on.") + listen := flag.String("listen", ":8080", "Address and port to listen on.") cors := flag.String("cors", "*", "The 'Access-Control-Allow-Origin' value to be returned.") pushPath := flag.String("push-path", "/metrics/", "HTTP path to accept pushed metrics.") + timeToLiveMs := flag.Int64("time-to-live-ms", 3600000, "How long unmerged metrics will live, in milliseconds (default 1h)") flag.Parse() - a := newAggate() + log.Println("PAG started on port", *listen) + + a := newAggate(*timeToLiveMs) http.HandleFunc("/metrics", a.handler) http.HandleFunc(*pushPath, func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", *cors) diff --git a/cmd/prom-aggregation-gateway/main_test.go b/cmd/prom-aggregation-gateway/main_test.go index a205178..2802535 100644 --- a/cmd/prom-aggregation-gateway/main_test.go +++ b/cmd/prom-aggregation-gateway/main_test.go @@ -5,118 +5,119 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/pmezard/go-difflib/difflib" ) -const ( +var ( in1 = ` # HELP gauge A gauge # TYPE gauge gauge -gauge 42 +gauge 42 %[1]d # HELP counter A counter # TYPE counter counter -counter 31 +counter 31 %[1]d # HELP histogram A histogram # TYPE histogram histogram -histogram_bucket{le="1"} 0 -histogram_bucket{le="2"} 0 -histogram_bucket{le="3"} 3 -histogram_bucket{le="4"} 4 -histogram_bucket{le="5"} 4 -histogram_bucket{le="6"} 4 -histogram_bucket{le="7"} 4 -histogram_bucket{le="8"} 4 -histogram_bucket{le="9"} 4 -histogram_bucket{le="10"} 4 -histogram_bucket{le="+Inf"} 4 -histogram_sum{} 2.5 -histogram_count{} 1 +histogram_bucket{le="1"} 0 %[1]d +histogram_bucket{le="2"} 0 %[1]d +histogram_bucket{le="3"} 3 %[1]d +histogram_bucket{le="4"} 4 %[1]d +histogram_bucket{le="5"} 4 %[1]d +histogram_bucket{le="6"} 4 %[1]d +histogram_bucket{le="7"} 4 %[1]d +histogram_bucket{le="8"} 4 %[1]d +histogram_bucket{le="9"} 4 %[1]d +histogram_bucket{le="10"} 4 %[1]d +histogram_bucket{le="+Inf"} 4 %[1]d +histogram_sum{} 2.5 %[1]d +histogram_count{} 1 %[1]d ` in2 = ` # HELP gauge A gauge # TYPE gauge gauge -gauge 57 +gauge 57 %[1]d # HELP counter A counter # TYPE counter counter -counter 29 +counter 29 %[1]d # HELP histogram A histogram # TYPE histogram histogram -histogram_bucket{le="1"} 0 -histogram_bucket{le="2"} 0 -histogram_bucket{le="3"} 0 -histogram_bucket{le="4"} 4 -histogram_bucket{le="5"} 5 -histogram_bucket{le="6"} 5 -histogram_bucket{le="7"} 5 -histogram_bucket{le="8"} 5 -histogram_bucket{le="9"} 5 -histogram_bucket{le="10"} 5 -histogram_bucket{le="+Inf"} 5 -histogram_sum 4.5 -histogram_count 1 +histogram_bucket{le="1"} 0 %[1]d +histogram_bucket{le="2"} 0 %[1]d +histogram_bucket{le="3"} 0 %[1]d +histogram_bucket{le="4"} 4 %[1]d +histogram_bucket{le="5"} 5 %[1]d +histogram_bucket{le="6"} 5 %[1]d +histogram_bucket{le="7"} 5 %[1]d +histogram_bucket{le="8"} 5 %[1]d +histogram_bucket{le="9"} 5 %[1]d +histogram_bucket{le="10"} 5 %[1]d +histogram_bucket{le="+Inf"} 5 %[1]d +histogram_sum 4.5 %[1]d +histogram_count 1 %[1]d ` want = `# HELP counter A counter # TYPE counter counter -counter 60 +counter 60 %[1]d # HELP gauge A gauge # TYPE gauge gauge -gauge 99 +gauge 99 %[1]d # HELP histogram A histogram # TYPE histogram histogram -histogram_bucket{le="1"} 0 -histogram_bucket{le="2"} 0 -histogram_bucket{le="3"} 3 -histogram_bucket{le="4"} 8 -histogram_bucket{le="5"} 9 -histogram_bucket{le="6"} 9 -histogram_bucket{le="7"} 9 -histogram_bucket{le="8"} 9 -histogram_bucket{le="9"} 9 -histogram_bucket{le="10"} 9 -histogram_bucket{le="+Inf"} 9 -histogram_sum 7 -histogram_count 2 +histogram_bucket{le="1"} 0 %[1]d +histogram_bucket{le="2"} 0 %[1]d +histogram_bucket{le="3"} 3 %[1]d +histogram_bucket{le="4"} 8 %[1]d +histogram_bucket{le="5"} 9 %[1]d +histogram_bucket{le="6"} 9 %[1]d +histogram_bucket{le="7"} 9 %[1]d +histogram_bucket{le="8"} 9 %[1]d +histogram_bucket{le="9"} 9 %[1]d +histogram_bucket{le="10"} 9 %[1]d +histogram_bucket{le="+Inf"} 9 %[1]d +histogram_sum 7 %[1]d +histogram_count 2 %[1]d ` multilabel1 = `# HELP counter A counter # TYPE counter counter -counter{a="a",b="b"} 1 +counter{a="a",b="b"} 1 %[1]d ` multilabel2 = `# HELP counter A counter # TYPE counter counter -counter{a="a",b="b"} 2 +counter{a="a",b="b"} 2 %[1]d ` multilabelResult = `# HELP counter A counter # TYPE counter counter -counter{a="a",b="b"} 3 +counter{a="a",b="b"} 3 %[1]d ` labelFields1 = `# HELP ui_page_render_errors A counter # TYPE ui_page_render_errors counter -ui_page_render_errors{path="/org/:orgId"} 1 -ui_page_render_errors{path="/prom/:orgId"} 1 +ui_page_render_errors{path="/org/:orgId"} 1 %[1]d +ui_page_render_errors{path="/prom/:orgId"} 1 %[1]d ` labelFields2 = `# HELP ui_page_render_errors A counter # TYPE ui_page_render_errors counter -ui_page_render_errors{path="/prom/:orgId"} 1 +ui_page_render_errors{path="/prom/:orgId"} 1 %[1]d ` labelFieldResult = `# HELP ui_page_render_errors A counter # TYPE ui_page_render_errors counter -ui_page_render_errors{path="/org/:orgId"} 1 -ui_page_render_errors{path="/prom/:orgId"} 2 +ui_page_render_errors{path="/org/:orgId"} 1 %[1]d +ui_page_render_errors{path="/prom/:orgId"} 2 %[1]d ` gaugeInput = ` # HELP ui_external_lib_loaded A gauge with entries in un-sorted order # TYPE ui_external_lib_loaded gauge -ui_external_lib_loaded{name="ga",loaded="true"} 1 -ui_external_lib_loaded{name="Intercom",loaded="true"} 1 -ui_external_lib_loaded{name="mixpanel",loaded="true"} 1 +ui_external_lib_loaded{name="ga",loaded="true"} 1 %[1]d +ui_external_lib_loaded{name="Intercom",loaded="true"} 1 %[1]d +ui_external_lib_loaded{name="mixpanel",loaded="true"} 1 %[1]d ` gaugeOutput = `# HELP ui_external_lib_loaded A gauge with entries in un-sorted order # TYPE ui_external_lib_loaded gauge -ui_external_lib_loaded{loaded="true",name="Intercom"} 2 -ui_external_lib_loaded{loaded="true",name="ga"} 2 -ui_external_lib_loaded{loaded="true",name="mixpanel"} 2 +ui_external_lib_loaded{loaded="true",name="Intercom"} 2 %[1]d +ui_external_lib_loaded{loaded="true",name="ga"} 2 %[1]d +ui_external_lib_loaded{loaded="true",name="mixpanel"} 2 %[1]d ` duplicateLabels = ` # HELP ui_external_lib_loaded Test with duplicate values @@ -128,19 +129,21 @@ ui_external_lib_loaded{name="Munchkin",loaded="true"} 1 reorderedLabels1 = `# HELP counter A counter # TYPE counter counter -counter{a="a",b="b"} 1 +counter{a="a",b="b"} 1 %[1]d ` reorderedLabels2 = `# HELP counter A counter # TYPE counter counter -counter{b="b",a="a"} 2 +counter{b="b",a="a"} 2 %[1]d ` reorderedLabelsResult = `# HELP counter A counter # TYPE counter counter -counter{a="a",b="b"} 3 +counter{a="a",b="b"} 3 %[1]d ` ) func TestAggate(t *testing.T) { + now := time.Now().UnixNano() / int64(time.Millisecond) + for _, c := range []struct { a, b string want string @@ -154,17 +157,20 @@ func TestAggate(t *testing.T) { {duplicateLabels, "", "", fmt.Errorf("%s", duplicateError), nil}, {reorderedLabels1, reorderedLabels2, reorderedLabelsResult, nil, nil}, } { - a := newAggate() + a := newAggate(3600000) + if c.b != "" { + c.a, c.b, c.want = fmt.Sprintf(c.a, now), fmt.Sprintf(c.b, now), fmt.Sprintf(c.want, now) + } if err := a.parseAndMerge(strings.NewReader(c.a)); err != nil { if c.err1 == nil { - t.Fatalf("Unexpected error: %s", err) + t.Fatalf("Unexpected error: '%s'", err) } else if c.err1.Error() != err.Error() { - t.Fatalf("Expected %s, got %s", c.err1, err) + t.Fatalf("Expected '%s', got '%s'", c.err1, err) } } if err := a.parseAndMerge(strings.NewReader(c.b)); err != c.err2 { - t.Fatalf("Expected %s, got %s", c.err2, err) + t.Fatalf("Expected '%s', got '%s'", c.err2, err) } r := httptest.NewRequest("GET", "http://example.com/foo", nil)