diff --git a/lib/metrics/metrics.go b/lib/metrics/metrics.go index 20fd28cbaf0..d550d762bd1 100644 --- a/lib/metrics/metrics.go +++ b/lib/metrics/metrics.go @@ -43,10 +43,10 @@ var ( HTTPReqDuration = stats.New("http_req_duration", stats.Trend, stats.Time) HTTPReqBlocked = stats.New("http_req_blocked", stats.Trend, stats.Time) HTTPReqConnecting = stats.New("http_req_connecting", stats.Trend, stats.Time) + HTTPReqTLSHandshaking = stats.New("http_req_tls_handshaking", stats.Trend, stats.Time) HTTPReqSending = stats.New("http_req_sending", stats.Trend, stats.Time) HTTPReqWaiting = stats.New("http_req_waiting", stats.Trend, stats.Time) HTTPReqReceiving = stats.New("http_req_receiving", stats.Trend, stats.Time) - HTTPReqTLSHandshaking = stats.New("http_req_tls_handshaking", stats.Trend, stats.Time) // Websocket-related WSSessions = stats.New("ws_sessions", stats.Counter) diff --git a/lib/netext/tracer.go b/lib/netext/tracer.go index 0f35b809556..e1803388c69 100644 --- a/lib/netext/tracer.go +++ b/lib/netext/tracer.go @@ -38,6 +38,9 @@ type Trail struct { StartTime time.Time EndTime time.Time + // Total connect time (Connecting + TLSHandshaking) + ConnDuration time.Duration + // Total request duration, excluding DNS lookup and connect time. Duration time.Duration @@ -300,6 +303,7 @@ func (t *Tracer) Done() *Trail { // Calculate total times using adjusted values. trail.EndTime = done + trail.ConnDuration = trail.Connecting + trail.TLSHandshaking trail.Duration = trail.Sending + trail.Waiting + trail.Receiving trail.StartTime = trail.EndTime.Add(-trail.Duration) diff --git a/stats/cloud/collector.go b/stats/cloud/collector.go index bc45b6a1ed9..1a0db72a35a 100644 --- a/stats/cloud/collector.go +++ b/stats/cloud/collector.go @@ -24,11 +24,13 @@ import ( "context" "encoding/json" "path/filepath" + "sort" "sync" "time" "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/netext" + "github.com/pkg/errors" "gopkg.in/guregu/null.v3" @@ -55,7 +57,7 @@ type Collector struct { bufferHTTPTrails []*netext.Trail bufferSamples []*Sample - aggBuckets map[int64]aggregationBucket + aggrBuckets map[int64]aggregationBucket } // Verify that Collector implements lib.Collector @@ -69,6 +71,10 @@ func New(conf Config, src *lib.SourceData, opts lib.Options, version string) (*C } } + if conf.AggregationPeriod.Duration > 0 && (opts.SystemTags["vu"] || opts.SystemTags["iter"]) { + return nil, errors.New("aggregatoion cannot be enabled if the 'vu' or 'iter' system tag is also enabled") + } + if !conf.Name.Valid { conf.Name = null.StringFrom(filepath.Base(src.Filename)) } @@ -95,12 +101,12 @@ func New(conf Config, src *lib.SourceData, opts lib.Options, version string) (*C } return &Collector{ - config: conf, - thresholds: thresholds, - client: NewClient(conf.Token.String, conf.Host.String, version), - anonymous: !conf.Token.Valid, - duration: duration, - aggBuckets: map[int64]aggregationBucket{}, + config: conf, + thresholds: thresholds, + client: NewClient(conf.Token.String, conf.Host.String, version), + anonymous: !conf.Token.Valid, + duration: duration, + aggrBuckets: map[int64]aggregationBucket{}, }, nil } @@ -152,9 +158,9 @@ func (c *Collector) Run(ctx context.Context) { for { select { case <-aggregationTicker.C: - c.aggregateHTTPTrails() + c.aggregateHTTPTrails(time.Duration(c.config.AggregationWaitPeriod.Duration)) case <-ctx.Done(): - c.aggregateHTTPTrails() + c.aggregateHTTPTrails(0) c.flushHTTPTrails() c.pushMetrics() wg.Done() @@ -240,9 +246,114 @@ func (c *Collector) Collect(sampleContainers []stats.SampleContainer) { } } -func (c *Collector) aggregateHTTPTrails() { - //TODO, this is just a placeholder so I can commit without a broken build - c.flushHTTPTrails() +func (c *Collector) aggregateHTTPTrails(waitPeriod time.Duration) { + c.bufferMutex.Lock() + newHTTPTrails := c.bufferHTTPTrails + c.bufferHTTPTrails = nil + c.bufferMutex.Unlock() + + aggrPeriod := int64(c.config.AggregationPeriod.Duration) + + // Distribute all newly buffered HTTP trails into buckets and sub-buckets + for _, trail := range newHTTPTrails { + trailTags := trail.GetTags() + bucketID := trail.GetTime().UnixNano() / aggrPeriod + + // Get or create a time bucket for that trail period + bucket, ok := c.aggrBuckets[bucketID] + if !ok { + bucket = aggregationBucket{} + c.aggrBuckets[bucketID] = bucket + } + + // Either use an existing subbucket key or use the trail tags as a new one + subBucketKey := trailTags + subBucket, ok := bucket[subBucketKey] + if !ok { + for sbTags, sb := range bucket { + if trailTags.IsEqual(sbTags) { + subBucketKey = sbTags + subBucket = sb + } + } + } + bucket[subBucketKey] = append(subBucket, trail) + } + + // Which buckets are still new and we'll wait for trails to accumulate before aggregating + bucketCutoffTime := time.Now().Add(-waitPeriod) + bucketCutoffID := int64(bucketCutoffTime.UnixNano()) / aggrPeriod + outliersCoef := c.config.AggregationOutliers.Float64 + newSamples := []*Sample{} + + // Handle all aggregation buckets older than bucketCutoffID + for bucketID, subBuckets := range c.aggrBuckets { + if bucketID > bucketCutoffID { + continue + } + + for tags, httpTrails := range subBuckets { + trailCount := int64(len(httpTrails)) + if trailCount < c.config.AggregationMinSamples.Int64 { + for _, trail := range httpTrails { + newSamples = append(newSamples, NewSampleFromTrail(trail)) + } + continue + } + + connDurations := make(durations, trailCount) + reqDurations := make(durations, trailCount) + for i, trail := range httpTrails { + connDurations[i] = trail.ConnDuration + reqDurations[i] = trail.Duration + } + sort.Sort(connDurations) + sort.Sort(reqDurations) + + minConnDur, maxConnDur := connDurations.GetNormalBounds(outliersCoef) + minReqDur, maxReqDur := reqDurations.GetNormalBounds(outliersCoef) + + aggData := SampleDataAggregatedHTTPReqs{ + Time: time.Unix(0, bucketID*aggrPeriod+aggrPeriod/2), + Type: "aggregated_trend", + Tags: tags, + } + + for _, trail := range httpTrails { + if trail.ConnDuration < minConnDur || + trail.ConnDuration > maxConnDur || + trail.Duration < minReqDur || + trail.Duration > maxReqDur { + + newSamples = append(newSamples, NewSampleFromTrail(trail)) + } else { + aggData.Count++ + aggData.Values.Duration.Add(trail.Duration) + aggData.Values.Blocked.Add(trail.Blocked) + aggData.Values.Connecting.Add(trail.Connecting) + aggData.Values.TLSHandshaking.Add(trail.TLSHandshaking) + aggData.Values.Sending.Add(trail.Sending) + aggData.Values.Waiting.Add(trail.Waiting) + aggData.Values.Receiving.Add(trail.Receiving) + } + } + aggData.CalcAverages() + if aggData.Count > 0 { + newSamples = append(newSamples, &Sample{ + Type: "AggregatedPoints", + Metric: "http_req_li_all", + Data: aggData, + }) + } + } + delete(c.aggrBuckets, bucketID) + } + + if len(newSamples) > 0 { + c.bufferMutex.Lock() + c.bufferSamples = append(c.bufferSamples, newSamples...) + c.bufferMutex.Unlock() + } } func (c *Collector) flushHTTPTrails() { @@ -253,7 +364,7 @@ func (c *Collector) flushHTTPTrails() { for _, trail := range c.bufferHTTPTrails { newSamples = append(newSamples, NewSampleFromTrail(trail)) } - for _, bucket := range c.aggBuckets { + for _, bucket := range c.aggrBuckets { for _, trails := range bucket { for _, trail := range trails { newSamples = append(newSamples, NewSampleFromTrail(trail)) @@ -262,7 +373,7 @@ func (c *Collector) flushHTTPTrails() { } c.bufferHTTPTrails = nil - c.aggBuckets = map[int64]aggregationBucket{} + c.aggrBuckets = map[int64]aggregationBucket{} c.bufferSamples = append(c.bufferSamples, newSamples...) } func (c *Collector) pushMetrics() { diff --git a/stats/cloud/config.go b/stats/cloud/config.go index b40abbfff2e..1237c582b45 100644 --- a/stats/cloud/config.go +++ b/stats/cloud/config.go @@ -72,7 +72,7 @@ type Config struct { // If aggregation is enabled, but the collected samples for a certain AggregationPeriod after AggregationPushDelay has passed are less than this number, they won't be aggregated. AggregationMinSamples null.Int `json:"aggregationMinSamples" envconfig:"CLOUD_AGGREGATION_MIN_SAMPLES"` - // Which HTTP trails to consier non-aggregatable outliers. + // Which HTTP trails (how many IQRs below Q1 or above Q3) to consier non-aggregatable outliers. AggregationOutliers null.Float `json:"aggregationOutliers" envconfig:"CLOUD_AGGREGATION_OUTLIERS"` } @@ -85,7 +85,7 @@ func NewConfig() Config { AggregationCalcInterval: types.NullDurationFrom(3 * time.Second), AggregationWaitPeriod: types.NullDurationFrom(5 * time.Second), AggregationMinSamples: null.IntFrom(100), - //TODO: set default AggregationOutliers + AggregationOutliers: null.FloatFrom(1.5), } } diff --git a/stats/cloud/data.go b/stats/cloud/data.go index aa7aebba9e9..29d5de3ccc6 100644 --- a/stats/cloud/data.go +++ b/stats/cloud/data.go @@ -5,7 +5,6 @@ import ( "github.com/loadimpact/k6/lib/metrics" "github.com/loadimpact/k6/lib/netext" - "github.com/loadimpact/k6/stats" ) @@ -57,22 +56,79 @@ func NewSampleFromTrail(trail *netext.Trail) *Sample { } } -// SampleDataAggregatedMap is used in aggregated samples for HTTP requests. -type SampleDataAggregatedMap struct { - Time time.Time `json:"time"` - Type string `json:"type"` - Count uint64 `json:"count"` - Tags *stats.SampleTags `json:"tags,omitempty"` - Values map[string]AggregatedMetric `json:"values,omitempty"` +// SampleDataAggregatedHTTPReqs is used in aggregated samples for HTTP requests. +type SampleDataAggregatedHTTPReqs struct { + Time time.Time `json:"time"` + Type string `json:"type"` + Count uint64 `json:"count"` + Tags *stats.SampleTags `json:"tags,omitempty"` + Values struct { + Duration AggregatedMetric `json:"http_req_duration"` + Blocked AggregatedMetric `json:"http_req_blocked"` + Connecting AggregatedMetric `json:"http_req_connecting"` + TLSHandshaking AggregatedMetric `json:"http_req_tls_handshaking"` + Sending AggregatedMetric `json:"http_req_sending"` + Waiting AggregatedMetric `json:"http_req_waiting"` + Receiving AggregatedMetric `json:"http_req_receiving"` + } `json:"values"` +} + +// CalcAverages calculates and sets all `Avg` properties in the `Values` struct +func (sdagg *SampleDataAggregatedHTTPReqs) CalcAverages() { + count := float64(sdagg.Count) + sdagg.Values.Duration.Avg = float64(sdagg.Values.Duration.sum) / count + sdagg.Values.Blocked.Avg = float64(sdagg.Values.Blocked.sum) / count + sdagg.Values.Connecting.Avg = float64(sdagg.Values.Connecting.sum) / count + sdagg.Values.TLSHandshaking.Avg = float64(sdagg.Values.TLSHandshaking.sum) / count + sdagg.Values.Sending.Avg = float64(sdagg.Values.Sending.sum) / count + sdagg.Values.Waiting.Avg = float64(sdagg.Values.Waiting.sum) / count + sdagg.Values.Receiving.Avg = float64(sdagg.Values.Receiving.sum) / count } // AggregatedMetric is used to store aggregated information for a // particular metric in an SampleDataAggregatedMap. type AggregatedMetric struct { - Min float64 `json:"min"` - Max float64 `json:"max"` - Avg float64 `json:"avg"` - StdDev float64 `json:"stddev"` + Min time.Duration `json:"min"` + Max time.Duration `json:"max"` + sum time.Duration `json:"-"` // ignored in JSON output because of SampleDataAggregatedHTTPReqs.Count + Avg float64 `json:"avg"` // not updated automatically, has to be set externally +} + +// Add the new duration to the internal sum and update Min and Max if necessary +func (am *AggregatedMetric) Add(t time.Duration) { + if am.sum == 0 || am.Min > t { + am.Min = t + } + if am.Max < t { + am.Max = t + } + am.sum += t } type aggregationBucket map[*stats.SampleTags][]*netext.Trail + +type durations []time.Duration + +func (d durations) Len() int { return len(d) } +func (d durations) Swap(i, j int) { d[i], d[j] = d[j], d[i] } +func (d durations) Less(i, j int) bool { return d[i] < d[j] } +func (d durations) GetNormalBounds(iqrCoef float64) (min, max time.Duration) { + l := len(d) + if l == 0 { + return + } + + var q1, q3 time.Duration + if l%4 == 0 { + q1 = d[l/4] + q3 = d[(l/4)*3] + } else { + q1 = (d[l/4] + d[(l/4)+1]) / 2 + q3 = (d[(l/4)*3] + d[(l/4)*3+1]) / 2 + } + + iqr := float64(q3 - q1) + min = q1 - time.Duration(iqrCoef*iqr) + max = q3 + time.Duration(iqrCoef*iqr) + return +}