Skip to content

Commit

Permalink
Calculate/Print latency in resource-management-service (#74)
Browse files Browse the repository at this point in the history
* Calculate p50/90/99

* Calculate overall service latency

* Bug fix for latency calculation

* Add metrics usage to aggregator & service api installer

* Bug fix for print latency report test

* Update lock to RW lock per CR.

* Added gate for aggregator checkpoint

* Add serializer encoded checkpoint as discussed.

* Add Test_MemoryUsageOfLatencyReport
  • Loading branch information
Sindica authored Jul 8, 2022
1 parent 8a53eee commit f52eed0
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 3 deletions.
10 changes: 10 additions & 0 deletions resource-management/pkg/aggregrator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/klog/v2"

distributor "global-resource-service/resource-management/pkg/common-lib/interfaces/distributor"
"global-resource-service/resource-management/pkg/common-lib/metrics"
"global-resource-service/resource-management/pkg/common-lib/types"
"global-resource-service/resource-management/pkg/common-lib/types/event"
)
Expand Down Expand Up @@ -177,6 +178,15 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64
if err != nil {
klog.Errorf("Error from JSON Unmarshal:", err)
}
if metrics.ResourceManagementMeasurement_Enabled {
for i := 0; i < len(ResponseObject.RegionNodeEvents); i++ {
for j := 0; j < len(ResponseObject.RegionNodeEvents[i]); j++ {
if ResponseObject.RegionNodeEvents[i][j] != nil {
ResponseObject.RegionNodeEvents[i][j].SetCheckpoint(metrics.Aggregator_Received)
}
}
}
}

return ResponseObject.RegionNodeEvents, ResponseObject.Length
}
Expand Down
7 changes: 5 additions & 2 deletions resource-management/pkg/common-lib/metrics/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package metrics
type ResourceManagementCheckpoint string

const (
Aggregator_Received ResourceManagementCheckpoint = "AGG_RECEIVED"
Aggregator_Received ResourceManagementCheckpoint = "AGG_RECEIVED"

Distributor_Received ResourceManagementCheckpoint = "DIS_RECEIVED"
Distributor_Sending ResourceManagementCheckpoint = "DIS_SENDING"
Distributor_Sent ResourceManagementCheckpoint = "DIS_SENT"
Serializer_Sent ResourceManagementCheckpoint = "SER_SENT"

Serializer_Encoded ResourceManagementCheckpoint = "SER_ENCODED"
Serializer_Sent ResourceManagementCheckpoint = "SER_SENT"
)

var ResourceManagementMeasurement_Enabled = true
62 changes: 62 additions & 0 deletions resource-management/pkg/common-lib/metrics/latency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package metrics

import (
"math"
"sort"
"time"
)

type LatencyReport struct {
TotalCount int
P50 time.Duration
P90 time.Duration
P99 time.Duration
}

type LatencyMetrics struct {
name string
latencies []time.Duration
}

func NewLatencyMetrics(name string) *LatencyMetrics {
return &LatencyMetrics{
name: name,
latencies: make([]time.Duration, 0),
}
}

func (m *LatencyMetrics) AddLatencyMetrics(newLatency time.Duration) {
m.latencies = append(m.latencies, newLatency)
}

func (m *LatencyMetrics) Len() int {
return len(m.latencies)
}

func (m *LatencyMetrics) Less(i, j int) bool {
return m.latencies[i] < m.latencies[j]
}

func (m *LatencyMetrics) Swap(i, j int) {
m.latencies[i], m.latencies[j] = m.latencies[j], m.latencies[i]
}

func (m *LatencyMetrics) GetSummary() *LatencyReport {
// sort
sort.Sort(m)
count := len(m.latencies)
if count == 0 {
return &LatencyReport{
TotalCount: count,
P50: 0,
P90: 0,
P99: 0,
}
}
return &LatencyReport{
TotalCount: count,
P50: m.latencies[int(math.Ceil(float64(count*50)/100))-1],
P90: m.latencies[int(math.Ceil(float64(count*90)/100))-1],
P99: m.latencies[int(math.Ceil(float64(count*99)/100))-1],
}
}
102 changes: 102 additions & 0 deletions resource-management/pkg/common-lib/metrics/latency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package metrics

import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func Test_GetLatencyReport(t *testing.T) {
testCases := []struct {
name string
start int
gap int
count int
p50 int
p90 int
p99 int
}{
{
name: "Test 1, start 1",
start: 1,
gap: 1,
count: 1,
p50: 1,
p90: 1,
p99: 1,
},
{
name: "Test 1-100, start 1",
start: 1,
gap: 1,
count: 100,
p50: 50,
p90: 90,
p99: 99,
},
{
name: "Test 1-99, start 1",
start: 1,
gap: 1,
count: 99,
p50: 50,
p90: 90,
p99: 99,
},
{
name: "Test 1-1000, start 10",
start: 10,
gap: 1,
count: 1000,
p50: 509,
p90: 909,
p99: 999,
},
{
name: "Test 1-1M, start 1",
start: 1,
gap: 1,
count: 1000000,
p50: 500000,
p90: 900000,
p99: 990000,
},
{
name: "Test 1-10M, start 1",
start: 1,
gap: 1,
count: 10000000,
p50: 5000000,
p90: 9000000,
p99: 9900000,
},
{
name: "Test 10M-1, start 10M",
start: 10000000,
gap: -1,
count: 10000000,
p50: 5000000,
p90: 9000000,
p99: 9900000,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
start := tt.start

latency := NewLatencyMetrics(tt.name)
for i := 0; i < tt.count; i++ {
latency.AddLatencyMetrics(time.Duration(start) * time.Millisecond)
start += tt.gap
}

report := latency.GetSummary()
t.Logf("Test [%s], p50 %v, p90 %v, p99 %v, count %v", tt.name, report.P50, report.P90, report.P99, report.TotalCount)
assert.Equal(t, time.Duration(tt.p50)*time.Millisecond, report.P50)
assert.Equal(t, time.Duration(tt.p90)*time.Millisecond, report.P90)
assert.Equal(t, time.Duration(tt.p99)*time.Millisecond, report.P99)
assert.Equal(t, tt.count, report.TotalCount)
})
}
}
6 changes: 5 additions & 1 deletion resource-management/pkg/common-lib/types/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ func (e *NodeEvent) SetCheckpoint(checkpoint metrics.ResourceManagementCheckpoin
e.checkpoints = make(map[metrics.ResourceManagementCheckpoint]time.Time, 5)
}
if _, isOK := e.checkpoints[checkpoint]; !isOK {
e.checkpoints[checkpoint] = time.Now()
e.checkpoints[checkpoint] = time.Now().UTC()
} else {
klog.Errorf("Checkpoint %v already set for event %s, node id %s, rv %s", checkpoint, e.Type, e.Node.Id, e.Node.ResourceVersion)
}
}

func (e *NodeEvent) GetCheckpoints() map[metrics.ResourceManagementCheckpoint]time.Time {
return e.checkpoints
}
100 changes: 100 additions & 0 deletions resource-management/pkg/common-lib/types/event/event_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package event

import (
"k8s.io/klog/v2"
"sync"

"global-resource-service/resource-management/pkg/common-lib/metrics"
)

type LatencyMetricsAllCheckpoints struct {
Aggregator_Received *metrics.LatencyMetrics
Distributor_Received *metrics.LatencyMetrics
Distributor_Sending *metrics.LatencyMetrics
Distributor_Sent *metrics.LatencyMetrics
Serializer_Encoded *metrics.LatencyMetrics
Serializer_Sent *metrics.LatencyMetrics
}

var latencyMetricsAllCheckpoints *LatencyMetricsAllCheckpoints
var latencyMetricsLock sync.RWMutex

func init() {
latencyMetricsAllCheckpoints = new(LatencyMetricsAllCheckpoints)
latencyMetricsAllCheckpoints.Aggregator_Received = metrics.NewLatencyMetrics(string(metrics.Aggregator_Received))
latencyMetricsAllCheckpoints.Distributor_Received = metrics.NewLatencyMetrics(string(metrics.Distributor_Received))
latencyMetricsAllCheckpoints.Distributor_Sending = metrics.NewLatencyMetrics(string(metrics.Distributor_Sending))
latencyMetricsAllCheckpoints.Distributor_Sent = metrics.NewLatencyMetrics(string(metrics.Distributor_Sent))
latencyMetricsAllCheckpoints.Serializer_Encoded = metrics.NewLatencyMetrics(string(metrics.Serializer_Encoded))
latencyMetricsAllCheckpoints.Serializer_Sent = metrics.NewLatencyMetrics(string(metrics.Serializer_Sent))
}

func AddLatencyMetricsAllCheckpoints(e *NodeEvent) {
if e == nil {
klog.Error("Nil event")
}
checkpointsPerEvent := e.GetCheckpoints()
if checkpointsPerEvent == nil {
klog.Errorf("Event (%v, Id %s, RV %s) does not have checkpoint stamped", e.Type, e.Node.Id, e.Node.ResourceVersion)
}
lastUpdatedTime := e.Node.LastUpdatedTime

agg_received_time, isOK1 := checkpointsPerEvent[metrics.Aggregator_Received]
dis_received_time, isOK2 := checkpointsPerEvent[metrics.Distributor_Received]
dis_sending_time, isOK3 := checkpointsPerEvent[metrics.Distributor_Sending]
dis_sent_time, isOK4 := checkpointsPerEvent[metrics.Distributor_Sent]
serializer_encoded_time, isOK5 := checkpointsPerEvent[metrics.Serializer_Encoded]
serializer_sent_time, isOK6 := checkpointsPerEvent[metrics.Serializer_Sent]

latencyMetricsLock.Lock()
defer latencyMetricsLock.Unlock()
if isOK1 {
latencyMetricsAllCheckpoints.Aggregator_Received.AddLatencyMetrics(agg_received_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Aggregator_Received)
}
if isOK2 {
latencyMetricsAllCheckpoints.Distributor_Received.AddLatencyMetrics(dis_received_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Distributor_Received)
}
if isOK3 {
latencyMetricsAllCheckpoints.Distributor_Sending.AddLatencyMetrics(dis_sending_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Distributor_Sending)
}
if isOK4 {
latencyMetricsAllCheckpoints.Distributor_Sent.AddLatencyMetrics(dis_sent_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Distributor_Sent)
}
if isOK5 {
latencyMetricsAllCheckpoints.Serializer_Encoded.AddLatencyMetrics(serializer_encoded_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Serializer_Encoded)
}
if isOK6 {
latencyMetricsAllCheckpoints.Serializer_Sent.AddLatencyMetrics(serializer_sent_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Serializer_Sent)
}
}

func PrintLatencyReport() {
latencyMetricsLock.RLock()
agg_received_summary := latencyMetricsAllCheckpoints.Aggregator_Received.GetSummary()
dis_received_summary := latencyMetricsAllCheckpoints.Distributor_Received.GetSummary()
dis_sending_summary := latencyMetricsAllCheckpoints.Distributor_Sending.GetSummary()
dis_sent_summary := latencyMetricsAllCheckpoints.Distributor_Sent.GetSummary()
serializer_encoded_summary := latencyMetricsAllCheckpoints.Serializer_Encoded.GetSummary()
serializer_sent_summary := latencyMetricsAllCheckpoints.Serializer_Sent.GetSummary()

latencyMetricsLock.RUnlock()
metrics_Message := "[Metrics][%s] perc50 %v, perc90 %v, perc99 %v. Total count %v"
klog.Infof(metrics_Message, metrics.Aggregator_Received, agg_received_summary.P50, agg_received_summary.P90, agg_received_summary.P99, agg_received_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Distributor_Received, dis_received_summary.P50, dis_received_summary.P90, dis_received_summary.P99, dis_received_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Distributor_Sending, dis_sending_summary.P50, dis_sending_summary.P90, dis_sending_summary.P99, dis_sending_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Distributor_Sent, dis_sent_summary.P50, dis_sent_summary.P90, dis_sent_summary.P99, dis_sent_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Serializer_Encoded, serializer_encoded_summary.P50, serializer_encoded_summary.P90, serializer_encoded_summary.P99, serializer_encoded_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Serializer_Sent, serializer_sent_summary.P50, serializer_sent_summary.P90, serializer_sent_summary.P99, serializer_sent_summary.TotalCount)
}
Loading

0 comments on commit f52eed0

Please sign in to comment.