Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does add right implementation to save gauge metrics #56

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 167 additions & 82 deletions cmd/prom-aggregation-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"net/http"
"os"
"regexp"
"sort"
"sync"

Expand All @@ -14,6 +18,36 @@ import (
"github.com/prometheus/common/model"
)

var (
Trace *log.Logger
Info *log.Logger
Warning *log.Logger
Error *log.Logger
)

func Init(
traceHandle io.Writer,
infoHandle io.Writer,
warningHandle io.Writer,
errorHandle io.Writer) {

Trace = log.New(traceHandle,
"TRACE: ",
log.Ldate|log.Ltime|log.Lshortfile)

Info = log.New(infoHandle,
"INFO: ",
log.Ldate|log.Ltime|log.Lshortfile)

Warning = log.New(warningHandle,
"WARNING: ",
log.Ldate|log.Ltime|log.Lshortfile)

Error = log.New(errorHandle,
"ERROR: ",
log.Ldate|log.Ltime|log.Lshortfile)
}

func lablesLessThan(a, b []*dto.LabelPair) bool {
i, j := 0, 0
for i < len(a) && j < len(b) {
Expand Down Expand Up @@ -78,24 +112,42 @@ func mergeBuckets(a, b []*dto.Bucket) []*dto.Bucket {
return output
}

func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
func mergeMetric(ty dto.MetricType, gaugeAggRule string, count int, a, b *dto.Metric) *dto.Metric {
switch ty {
case dto.MetricType_COUNTER:
return &dto.Metric{
Label: a.Label,
Counter: &dto.Counter{
// TODO: how would this work in a multiple instance scenario?
Value: float64ptr(*a.Counter.Value + *b.Counter.Value),
},
}

case dto.MetricType_GAUGE:
// No very meaninful way for us to merge gauges. We'll sum them
// and clear out any gauges on scrape, as a best approximation, but
// this relies on client pushing with the same interval as we scrape.
if gaugeAggRule == "" {
gaugeAggRule = "last"
}
var value float64
switch gaugeAggRule {
case "max":
value = math.Max(*a.Gauge.Value, *b.Gauge.Value)
case "min":
value = math.Min(*a.Gauge.Value, *b.Gauge.Value)
case "sum":
value = *a.Gauge.Value + *b.Gauge.Value
case "avg":
value = (*a.Gauge.Value*(float64(count)-1) + *b.Gauge.Value) / float64(count)
case "last":
value = *b.Gauge.Value
case "first":
value = *a.Gauge.Value
}
Trace.Printf("gauge aggregation: old=%.1f new=%.1f agg=%s out=%.1f", *a.Gauge.Value, *b.Gauge.Value, gaugeAggRule, value)
// Average out value
return &dto.Metric{
Label: a.Label,
Gauge: &dto.Gauge{
Value: float64ptr(*a.Gauge.Value + *b.Gauge.Value),
Value: float64ptr(value),
},
}

Expand All @@ -113,7 +165,7 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
return &dto.Metric{
Label: a.Label,
Untyped: &dto.Untyped{
Value: float64ptr(*a.Untyped.Value + *b.Untyped.Value),
Value: float64ptr((*a.Untyped.Value*(float64(count)-1) + *b.Untyped.Value) / float64(count)),
},
}

Expand All @@ -125,111 +177,134 @@ func mergeMetric(ty dto.MetricType, a, b *dto.Metric) *dto.Metric {
return nil
}

func mergeFamily(a, b *dto.MetricFamily) (*dto.MetricFamily, error) {
if *a.Type != *b.Type {
return nil, fmt.Errorf("Cannot merge metric '%s': type %s != %s",
*a.Name, a.Type.String(), b.Type.String())
}
// Takes a new family (nf) and adds it to an existing family (nf)
func (a *aggate) mergeFamily(nf *dto.MetricFamily) error {

output := &dto.MetricFamily{
Name: a.Name,
Help: a.Help,
Type: a.Type,
}
metrics := make(map[model.Fingerprint]*dto.Metric)

i, j := 0, 0
for i < len(a.Metric) && j < len(b.Metric) {
if lablesLessThan(a.Metric[i].Label, b.Metric[j].Label) {
output.Metric = append(output.Metric, a.Metric[i])
i++
} else if lablesLessThan(b.Metric[j].Label, a.Metric[i].Label) {
output.Metric = append(output.Metric, b.Metric[j])
j++
} else {
merged := mergeMetric(*a.Type, a.Metric[i], b.Metric[j])
if merged != nil {
output.Metric = append(output.Metric, merged)
// Add exiting metrics
ef, ok := a.families[*nf.Name]
if ok {

// Check the metric types
if *ef.Type != *nf.Type {
return fmt.Errorf("Cannot merge metric '%s': type %s != %s",
*ef.Name, ef.Type.String(), nf.Type.String())
}

for _, m := range ef.Metric {
fp, err := fingerprint(*ef.Name, m)
if err != nil {
return err
}
metrics[fp] = m
}
} else {
if *nf.Type == dto.MetricType_GAUGE {
help := nf.GetHelp()
pat := regexp.MustCompile(`<gauge:agg:(\w+)>`)
matches := pat.FindStringSubmatch(help)
if len(matches) > 1 {
Trace.Printf("Found aggregation for gauge: %s, %s", *nf.Name, matches[1])
a.gaugeAggRules[*nf.Name] = matches[1]
}
i++
j++
}
}
for ; i < len(a.Metric); i++ {
output.Metric = append(output.Metric, a.Metric[i])

// Merge or add new Metrics
for _, m := range nf.Metric {

fp, err := fingerprint(*nf.Name, m)
if err != nil {
return err
}
// Add count to fingerprints
a.fingerprintCounts[fp]++

oldMetric, ok := metrics[fp]
if ok {
metrics[fp] = mergeMetric(*nf.Type, a.gaugeAggRules[*nf.Name], a.fingerprintCounts[fp], oldMetric, m)
} else {
metrics[fp] = m
}
}
for ; j < len(b.Metric); j++ {
output.Metric = append(output.Metric, b.Metric[j])

// Add the metrics back
nf.Metric = []*dto.Metric{}
for _, m := range metrics {

sort.Sort(byName(m.Label)) // Sort metrics labels
nf.Metric = append(nf.Metric, m)
}
return output, nil
}

type aggate struct {
familiesLock sync.RWMutex
families map[string]*dto.MetricFamily
sort.Sort(byLabel(nf.Metric)) // Sort metrics
a.families[*nf.Name] = nf

return nil
}

func newAggate() *aggate {
return &aggate{
families: map[string]*dto.MetricFamily{},
func fingerprint(name string, m *dto.Metric) (f model.Fingerprint, err error) {
lset := make(model.LabelSet, len(m.Label)+1)
for _, p := range m.Label {
lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
lset[model.MetricNameLabel] = model.LabelValue(name)
if err := lset.Validate(); err != nil {
return f, err
}
return lset.Fingerprint(), nil
}

func validateFamily(f *dto.MetricFamily) error {
// Map of fingerprints we've seen before in this family
fingerprints := make(map[model.Fingerprint]struct{}, len(f.Metric))
for _, m := range f.Metric {
// Turn protobuf LabelSet into Prometheus model LabelSet
lset := make(model.LabelSet, len(m.Label)+1)
for _, p := range m.Label {
lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
if err := lset.Validate(); err != nil {

fingerprint, err := fingerprint(f.GetName(), m)
if err != nil {
return err
}
fingerprint := lset.Fingerprint()
if _, found := fingerprints[fingerprint]; found {
return fmt.Errorf("Duplicate labels: %v", lset)
return fmt.Errorf("Duplicate labels: %v", m)
}
fingerprints[fingerprint] = struct{}{}
}
return nil
}

type aggate struct {
sync.RWMutex
families map[string]*dto.MetricFamily
gaugeAggRules map[string]string
fingerprintCounts map[model.Fingerprint]int
}

func newAggate() *aggate {
return &aggate{
families: map[string]*dto.MetricFamily{},
gaugeAggRules: map[string]string{},
fingerprintCounts: make(map[model.Fingerprint]int),
}
}

func (a *aggate) parseAndMerge(r io.Reader) error {
var parser expfmt.TextParser
inFamilies, err := parser.TextToMetricFamilies(r)
if err != nil {
return err
}

a.familiesLock.Lock()
defer a.familiesLock.Unlock()
for name, family := range inFamilies {
// Sort labels in case source sends them inconsistently
for _, m := range family.Metric {
sort.Sort(byName(m.Label))
}
a.Lock()
defer a.Unlock()
for _, family := range inFamilies {

if err := validateFamily(family); err != nil {
return err
}

// family must be sorted for the merge
sort.Sort(byLabel(family.Metric))

existingFamily, ok := a.families[name]
if !ok {
a.families[name] = family
continue
}

merged, err := mergeFamily(existingFamily, family)
if err != nil {
if err := a.mergeFamily(family); err != nil {
return err
}

a.families[name] = merged
}

return nil
Expand All @@ -240,8 +315,9 @@ func (a *aggate) handler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", string(contentType))
enc := expfmt.NewEncoder(w, contentType)

a.familiesLock.RLock()
defer a.familiesLock.RUnlock()
a.Lock()
defer a.Unlock()

metricNames := []string{}
for name := range a.families {
metricNames = append(metricNames, name)
Expand All @@ -255,25 +331,34 @@ func (a *aggate) handler(w http.ResponseWriter, r *http.Request) {
}
}

// TODO reset gauges
// reset counts for gauge averages
a.fingerprintCounts = make(map[model.Fingerprint]int)
}

func handleHealthCheck(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
io.WriteString(w, `{"alive": true}`)
func readyHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("OK"))
}
func healthyHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("OK"))
}

func main() {
listen := flag.String("listen", ":80", "Address and port to listen on.")
Init(ioutil.Discard, os.Stdout, os.Stdout, os.Stderr)
Info.Println("Starting main")
listen := flag.String("listen", ":9091", "Address and port to listen on.")
cors := flag.String("cors", "*", "The 'Access-Control-Allow-Origin' value to be returned.")
pushPath := flag.String("push-path", "/metrics/", "HTTP path to accept pushed metrics.")
flag.Parse()

a := newAggate()
Info.Printf("Listening on %s\n", *listen)
http.HandleFunc("/metrics", a.handler)
http.HandleFunc("/-/healthy", handleHealthCheck)
http.HandleFunc("/-/ready", handleHealthCheck)

http.HandleFunc("/-/healthy", healthyHandler)
http.HandleFunc("/-/ready", readyHandler)

http.HandleFunc(*pushPath, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", *cors)
if err := a.parseAndMerge(r.Body); err != nil {
Expand All @@ -282,5 +367,5 @@ func main() {
return
}
})
log.Fatal(http.ListenAndServe(*listen, nil))
Info.Println(http.ListenAndServe(*listen, nil))
}
Loading