Skip to content

Commit

Permalink
Add first semi-working aggregation implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Apr 24, 2018
1 parent 43c2dc0 commit 1355eca
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 29 deletions.
2 changes: 1 addition & 1 deletion lib/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ var (
HTTPReqDuration = stats.New("http_req_duration", stats.Trend, stats.Time)
HTTPReqBlocked = stats.New("http_req_blocked", stats.Trend, stats.Time)
HTTPReqConnecting = stats.New("http_req_connecting", stats.Trend, stats.Time)
HTTPReqTLSHandshaking = stats.New("http_req_tls_handshaking", stats.Trend, stats.Time)
HTTPReqSending = stats.New("http_req_sending", stats.Trend, stats.Time)
HTTPReqWaiting = stats.New("http_req_waiting", stats.Trend, stats.Time)
HTTPReqReceiving = stats.New("http_req_receiving", stats.Trend, stats.Time)
HTTPReqTLSHandshaking = stats.New("http_req_tls_handshaking", stats.Trend, stats.Time)

// Websocket-related
WSSessions = stats.New("ws_sessions", stats.Counter)
Expand Down
4 changes: 4 additions & 0 deletions lib/netext/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Trail struct {
StartTime time.Time
EndTime time.Time

// Total connect time (Connecting + TLSHandshaking)
ConnDuration time.Duration

// Total request duration, excluding DNS lookup and connect time.
Duration time.Duration

Expand Down Expand Up @@ -300,6 +303,7 @@ func (t *Tracer) Done() *Trail {

// Calculate total times using adjusted values.
trail.EndTime = done
trail.ConnDuration = trail.Connecting + trail.TLSHandshaking
trail.Duration = trail.Sending + trail.Waiting + trail.Receiving
trail.StartTime = trail.EndTime.Add(-trail.Duration)

Expand Down
139 changes: 125 additions & 14 deletions stats/cloud/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"context"
"encoding/json"
"path/filepath"
"sort"
"sync"
"time"

"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/netext"
"github.com/pkg/errors"

"gopkg.in/guregu/null.v3"

Expand All @@ -55,7 +57,7 @@ type Collector struct {
bufferHTTPTrails []*netext.Trail
bufferSamples []*Sample

aggBuckets map[int64]aggregationBucket
aggrBuckets map[int64]aggregationBucket
}

// Verify that Collector implements lib.Collector
Expand All @@ -69,6 +71,10 @@ func New(conf Config, src *lib.SourceData, opts lib.Options, version string) (*C
}
}

if conf.AggregationPeriod.Duration > 0 && (opts.SystemTags["vu"] || opts.SystemTags["iter"]) {
return nil, errors.New("aggregatoion cannot be enabled if the 'vu' or 'iter' system tag is also enabled")
}

if !conf.Name.Valid {
conf.Name = null.StringFrom(filepath.Base(src.Filename))
}
Expand All @@ -95,12 +101,12 @@ func New(conf Config, src *lib.SourceData, opts lib.Options, version string) (*C
}

return &Collector{
config: conf,
thresholds: thresholds,
client: NewClient(conf.Token.String, conf.Host.String, version),
anonymous: !conf.Token.Valid,
duration: duration,
aggBuckets: map[int64]aggregationBucket{},
config: conf,
thresholds: thresholds,
client: NewClient(conf.Token.String, conf.Host.String, version),
anonymous: !conf.Token.Valid,
duration: duration,
aggrBuckets: map[int64]aggregationBucket{},
}, nil
}

Expand Down Expand Up @@ -152,9 +158,9 @@ func (c *Collector) Run(ctx context.Context) {
for {
select {
case <-aggregationTicker.C:
c.aggregateHTTPTrails()
c.aggregateHTTPTrails(time.Duration(c.config.AggregationWaitPeriod.Duration))
case <-ctx.Done():
c.aggregateHTTPTrails()
c.aggregateHTTPTrails(0)
c.flushHTTPTrails()
c.pushMetrics()
wg.Done()
Expand Down Expand Up @@ -240,9 +246,114 @@ func (c *Collector) Collect(sampleContainers []stats.SampleContainer) {
}
}

func (c *Collector) aggregateHTTPTrails() {
//TODO, this is just a placeholder so I can commit without a broken build
c.flushHTTPTrails()
func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) {
c.bufferMutex.Lock()
newHTTPTrails := c.bufferHTTPTrails
c.bufferHTTPTrails = nil
c.bufferMutex.Unlock()

aggrPeriod := int64(c.config.AggregationPeriod.Duration)

// Distribute all newly buffered HTTP trails into buckets and sub-buckets
for _, trail := range newHTTPTrails {
trailTags := trail.GetTags()
bucketID := trail.GetTime().UnixNano() / aggrPeriod

// Get or create a time bucket for that trail period
bucket, ok := c.aggrBuckets[bucketID]
if !ok {
bucket = aggregationBucket{}
c.aggrBuckets[bucketID] = bucket
}

// Either use an existing subbucket key or use the trail tags as a new one
subBucketKey := trailTags
subBucket, ok := bucket[subBucketKey]
if !ok {
for sbTags, sb := range bucket {
if trailTags.IsEqual(sbTags) {
subBucketKey = sbTags
subBucket = sb
}
}
}
bucket[subBucketKey] = append(subBucket, trail)
}

// Which buckets are still new and we'll wait for trails to accumulate before aggregating
bucketCutoffTime := time.Now().Add(-waitPeriod)
bucketCutoffID := int64(bucketCutoffTime.UnixNano()) / aggrPeriod
outliersCoef := c.config.AggregationOutliers.Float64
newSamples := []*Sample{}

// Handle all aggregation buckets older than bucketCutoffID
for bucketID, subBuckets := range c.aggrBuckets {
if bucketID > bucketCutoffID {
continue
}

for tags, httpTrails := range subBuckets {
trailCount := int64(len(httpTrails))
if trailCount < c.config.AggregationMinSamples.Int64 {
for _, trail := range httpTrails {
newSamples = append(newSamples, NewSampleFromTrail(trail))
}
continue
}

connDurations := make(durations, trailCount)
reqDurations := make(durations, trailCount)
for i, trail := range httpTrails {
connDurations[i] = trail.ConnDuration
reqDurations[i] = trail.Duration
}
sort.Sort(connDurations)
sort.Sort(reqDurations)

minConnDur, maxConnDur := connDurations.GetNormalBounds(outliersCoef)
minReqDur, maxReqDur := reqDurations.GetNormalBounds(outliersCoef)

aggData := SampleDataAggregatedHTTPReqs{
Time: time.Unix(0, bucketID*aggrPeriod+aggrPeriod/2),
Type: "aggregated_trend",
Tags: tags,
}

for _, trail := range httpTrails {
if trail.ConnDuration < minConnDur ||
trail.ConnDuration > maxConnDur ||
trail.Duration < minReqDur ||
trail.Duration > maxReqDur {

newSamples = append(newSamples, NewSampleFromTrail(trail))
} else {
aggData.Count++
aggData.Values.Duration.Add(trail.Duration)
aggData.Values.Blocked.Add(trail.Blocked)
aggData.Values.Connecting.Add(trail.Connecting)
aggData.Values.TLSHandshaking.Add(trail.TLSHandshaking)
aggData.Values.Sending.Add(trail.Sending)
aggData.Values.Waiting.Add(trail.Waiting)
aggData.Values.Receiving.Add(trail.Receiving)
}
}
aggData.CalcAverages()
if aggData.Count > 0 {
newSamples = append(newSamples, &Sample{
Type: "AggregatedPoints",
Metric: "http_req_li_all",
Data: aggData,
})
}
}
delete(c.aggrBuckets, bucketID)
}

if len(newSamples) > 0 {
c.bufferMutex.Lock()
c.bufferSamples = append(c.bufferSamples, newSamples...)
c.bufferMutex.Unlock()
}
}

func (c *Collector) flushHTTPTrails() {
Expand All @@ -253,7 +364,7 @@ func (c *Collector) flushHTTPTrails() {
for _, trail := range c.bufferHTTPTrails {
newSamples = append(newSamples, NewSampleFromTrail(trail))
}
for _, bucket := range c.aggBuckets {
for _, bucket := range c.aggrBuckets {
for _, trails := range bucket {
for _, trail := range trails {
newSamples = append(newSamples, NewSampleFromTrail(trail))
Expand All @@ -262,7 +373,7 @@ func (c *Collector) flushHTTPTrails() {
}

c.bufferHTTPTrails = nil
c.aggBuckets = map[int64]aggregationBucket{}
c.aggrBuckets = map[int64]aggregationBucket{}
c.bufferSamples = append(c.bufferSamples, newSamples...)
}
func (c *Collector) pushMetrics() {
Expand Down
4 changes: 2 additions & 2 deletions stats/cloud/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Config struct {
// If aggregation is enabled, but the collected samples for a certain AggregationPeriod after AggregationPushDelay has passed are less than this number, they won't be aggregated.
AggregationMinSamples null.Int `json:"aggregationMinSamples" envconfig:"CLOUD_AGGREGATION_MIN_SAMPLES"`

// Which HTTP trails to consier non-aggregatable outliers.
// Which HTTP trails (how many IQRs below Q1 or above Q3) to consier non-aggregatable outliers.
AggregationOutliers null.Float `json:"aggregationOutliers" envconfig:"CLOUD_AGGREGATION_OUTLIERS"`
}

Expand All @@ -85,7 +85,7 @@ func NewConfig() Config {
AggregationCalcInterval: types.NullDurationFrom(3 * time.Second),
AggregationWaitPeriod: types.NullDurationFrom(5 * time.Second),
AggregationMinSamples: null.IntFrom(100),
//TODO: set default AggregationOutliers
AggregationOutliers: null.FloatFrom(1.5),
}
}

Expand Down
80 changes: 68 additions & 12 deletions stats/cloud/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/loadimpact/k6/lib/metrics"
"github.com/loadimpact/k6/lib/netext"

"github.com/loadimpact/k6/stats"
)

Expand Down Expand Up @@ -57,22 +56,79 @@ func NewSampleFromTrail(trail *netext.Trail) *Sample {
}
}

// SampleDataAggregatedMap is used in aggregated samples for HTTP requests.
type SampleDataAggregatedMap struct {
Time time.Time `json:"time"`
Type string `json:"type"`
Count uint64 `json:"count"`
Tags *stats.SampleTags `json:"tags,omitempty"`
Values map[string]AggregatedMetric `json:"values,omitempty"`
// SampleDataAggregatedHTTPReqs is used in aggregated samples for HTTP requests.
type SampleDataAggregatedHTTPReqs struct {
Time time.Time `json:"time"`
Type string `json:"type"`
Count uint64 `json:"count"`
Tags *stats.SampleTags `json:"tags,omitempty"`
Values struct {
Duration AggregatedMetric `json:"http_req_duration"`
Blocked AggregatedMetric `json:"http_req_blocked"`
Connecting AggregatedMetric `json:"http_req_connecting"`
TLSHandshaking AggregatedMetric `json:"http_req_tls_handshaking"`
Sending AggregatedMetric `json:"http_req_sending"`
Waiting AggregatedMetric `json:"http_req_waiting"`
Receiving AggregatedMetric `json:"http_req_receiving"`
} `json:"values"`
}

// CalcAverages calculates and sets all `Avg` properties in the `Values` struct
func (sdagg *SampleDataAggregatedHTTPReqs) CalcAverages() {
count := float64(sdagg.Count)
sdagg.Values.Duration.Avg = float64(sdagg.Values.Duration.sum) / count
sdagg.Values.Blocked.Avg = float64(sdagg.Values.Blocked.sum) / count
sdagg.Values.Connecting.Avg = float64(sdagg.Values.Connecting.sum) / count
sdagg.Values.TLSHandshaking.Avg = float64(sdagg.Values.TLSHandshaking.sum) / count
sdagg.Values.Sending.Avg = float64(sdagg.Values.Sending.sum) / count
sdagg.Values.Waiting.Avg = float64(sdagg.Values.Waiting.sum) / count
sdagg.Values.Receiving.Avg = float64(sdagg.Values.Receiving.sum) / count
}

// AggregatedMetric is used to store aggregated information for a
// particular metric in an SampleDataAggregatedMap.
type AggregatedMetric struct {
Min float64 `json:"min"`
Max float64 `json:"max"`
Avg float64 `json:"avg"`
StdDev float64 `json:"stddev"`
Min time.Duration `json:"min"`
Max time.Duration `json:"max"`
sum time.Duration `json:"-"` // ignored in JSON output because of SampleDataAggregatedHTTPReqs.Count
Avg float64 `json:"avg"` // not updated automatically, has to be set externally
}

// Add the new duration to the internal sum and update Min and Max if necessary
func (am *AggregatedMetric) Add(t time.Duration) {
if am.sum == 0 || am.Min > t {
am.Min = t
}
if am.Max < t {
am.Max = t
}
am.sum += t
}

type aggregationBucket map[*stats.SampleTags][]*netext.Trail

type durations []time.Duration

func (d durations) Len() int { return len(d) }
func (d durations) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d durations) Less(i, j int) bool { return d[i] < d[j] }
func (d durations) GetNormalBounds(iqrCoef float64) (min, max time.Duration) {
l := len(d)
if l == 0 {
return
}

var q1, q3 time.Duration
if l%4 == 0 {
q1 = d[l/4]
q3 = d[(l/4)*3]
} else {
q1 = (d[l/4] + d[(l/4)+1]) / 2
q3 = (d[(l/4)*3] + d[(l/4)*3+1]) / 2
}

iqr := float64(q3 - q1)
min = q1 - time.Duration(iqrCoef*iqr)
max = q3 + time.Duration(iqrCoef*iqr)
return
}

0 comments on commit 1355eca

Please sign in to comment.