-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathinstrumented_composite.go
108 lines (88 loc) · 2.96 KB
/
instrumented_composite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright (c) The EfficientGo Authors.
// Licensed under the Apache License 2.0.
package e2emon
import (
"context"
"time"
"github.com/efficientgo/core/backoff"
"github.com/efficientgo/core/errors"
)
// CompositeInstrumentedRunnable abstract a higher-level service composed by more than one InstrumentedRunnable.
type CompositeInstrumentedRunnable struct {
runnables []*InstrumentedRunnable
// Generic retry backoff.
backoff *backoff.Backoff
}
func NewCompositeInstrumentedRunnable(runnables ...*InstrumentedRunnable) *CompositeInstrumentedRunnable {
return &CompositeInstrumentedRunnable{
runnables: runnables,
backoff: backoff.New(context.Background(), backoff.Config{
Min: 300 * time.Millisecond,
Max: 600 * time.Millisecond,
MaxRetries: 50, // Sometimes the CI is slow ¯\_(ツ)_/¯
}),
}
}
func (r *CompositeInstrumentedRunnable) Instances() []*InstrumentedRunnable {
return r.runnables
}
func (r *CompositeInstrumentedRunnable) MetricTargets() (ret []Target) {
for _, inst := range r.runnables {
ret = append(ret, inst.MetricTargets()...)
}
return ret
}
func (r *CompositeInstrumentedRunnable) buildMetricsOptions(opts []MetricsOption) metricsOptions {
result := metricsOptions{
getValue: getMetricValue,
waitBackoff: r.backoff,
}
for _, opt := range opts {
opt(&result)
}
return result
}
// WaitSumMetrics waits for at least one instance of each given metric names to be present and their sums, returning true
// when passed to given expected(...).
func (r *CompositeInstrumentedRunnable) WaitSumMetrics(expected MetricValueExpectation, metricNames ...string) error {
return r.WaitSumMetricsWithOptions(expected, metricNames)
}
func (r *CompositeInstrumentedRunnable) WaitSumMetricsWithOptions(expected MetricValueExpectation, metricNames []string, opts ...MetricsOption) error {
var (
sums []float64
err error
options = r.buildMetricsOptions(opts)
)
for options.waitBackoff.Reset(); options.waitBackoff.Ongoing(); {
sums, err = r.SumMetrics(metricNames, opts...)
if options.waitMissingMetrics && errors.Is(err, errMissingMetric) {
options.waitBackoff.Wait()
continue
}
if err != nil {
return err
}
if expected(sums...) {
return nil
}
options.waitBackoff.Wait()
}
return errors.Wrapf(err, "unable to find metrics %s with expected values. Last values: %v", metricNames, sums)
}
// SumMetrics returns the sum of the values of each given metric names.
func (r *CompositeInstrumentedRunnable) SumMetrics(metricNames []string, opts ...MetricsOption) ([]float64, error) {
sums := make([]float64, len(metricNames))
for _, service := range r.runnables {
partials, err := service.SumMetrics(metricNames, opts...)
if err != nil {
return nil, err
}
if len(partials) != len(sums) {
return nil, errors.Newf("unexpected mismatching sum metrics results (got %d, expected %d)", len(partials), len(sums))
}
for i := 0; i < len(sums); i++ {
sums[i] += partials[i]
}
}
return sums, nil
}