Skip to content

Commit

Permalink
Change checkpoints from map to array (#93)
Browse files Browse the repository at this point in the history
* Revert unnecessary changes in PR 85

* Move feature check out of business logic

* Fix spelling error

* Change checkpoints from map to array

* Add test case to TestSingleRPMutipleClients_Workflow: 1M nodes with 50 clients each has 15000 , each got 100K update events

* Print registration result properly

* Log latency detail for each event

* Add back testcases

* Use constants for checkpoint name

* Add perf data for distributor concurrency test after adding checkpoints with array

* Update per CR
  • Loading branch information
Sindica authored Jul 21, 2022
1 parent 1ac3b09 commit 526147a
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 69 deletions.
1 change: 0 additions & 1 deletion resource-management/pkg/aggregrator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64
for i := 0; i < len(ResponseObject.RegionNodeEvents); i++ {
for j := 0; j < len(ResponseObject.RegionNodeEvents[i]); j++ {
if ResponseObject.RegionNodeEvents[i][j] != nil {
ResponseObject.RegionNodeEvents[i][j].CreateCheckPointsMap()
ResponseObject.RegionNodeEvents[i][j].SetCheckpoint(metrics.Aggregator_Received)
}
}
Expand Down
29 changes: 22 additions & 7 deletions resource-management/pkg/common-lib/metrics/checkpoints.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
package metrics

type ResourceManagementCheckpoint string
type ResourceManagementCheckpoint int

const (
Aggregator_Received ResourceManagementCheckpoint = "AGG_RECEIVED"
Aggregator_Received ResourceManagementCheckpoint = 0

Distributor_Received ResourceManagementCheckpoint = "DIS_RECEIVED"
Distributor_Sending ResourceManagementCheckpoint = "DIS_SENDING"
Distributor_Sent ResourceManagementCheckpoint = "DIS_SENT"
Distributor_Received ResourceManagementCheckpoint = 1
Distributor_Sending ResourceManagementCheckpoint = 2
Distributor_Sent ResourceManagementCheckpoint = 3

Serializer_Encoded ResourceManagementCheckpoint = "SER_ENCODED"
Serializer_Sent ResourceManagementCheckpoint = "SER_SENT"
Serializer_Encoded ResourceManagementCheckpoint = 4
Serializer_Sent ResourceManagementCheckpoint = 5

Len_ResourceManagementCheckpoint = 6
)

type ResourceManagementCheckpointName string

const (
Aggregator_Received_Name ResourceManagementCheckpointName = "AGG_RECEIVED"

Distributor_Received_Name ResourceManagementCheckpointName = "DIS_RECEIVED"
Distributor_Sending_Name ResourceManagementCheckpointName = "DIS_SENDING"
Distributor_Sent_Name ResourceManagementCheckpointName = "DIS_SENT"

Serializer_Encoded_Name ResourceManagementCheckpointName = "SER_ENCODED"
Serializer_Sent_Name ResourceManagementCheckpointName = "SER_SENT"
)

var ResourceManagementMeasurement_Enabled = true
Expand Down
6 changes: 3 additions & 3 deletions resource-management/pkg/common-lib/metrics/latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ type LatencyReport struct {
}

type LatencyMetrics struct {
name string
value int
latencies []time.Duration
}

func NewLatencyMetrics(name string) *LatencyMetrics {
func NewLatencyMetrics(value int) *LatencyMetrics {
return &LatencyMetrics{
name: name,
value: value,
latencies: make([]time.Duration, 0),
}
}
Expand Down
2 changes: 1 addition & 1 deletion resource-management/pkg/common-lib/metrics/latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Test_GetLatencyReport(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
start := tt.start

latency := NewLatencyMetrics(tt.name)
latency := NewLatencyMetrics(0)
for i := 0; i < tt.count; i++ {
latency.AddLatencyMetrics(time.Duration(start) * time.Millisecond)
start += tt.gap
Expand Down
20 changes: 6 additions & 14 deletions resource-management/pkg/common-lib/types/event/event.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package event

import (
"k8s.io/klog/v2"
"time"

"global-resource-service/resource-management/pkg/common-lib/metrics"
Expand All @@ -22,20 +21,14 @@ const (
type NodeEvent struct {
Type EventType
Node *types.LogicalNode
checkpoints map[metrics.ResourceManagementCheckpoint]time.Time
checkpoints []time.Time
}

func NewNodeEvent(node *types.LogicalNode, eventType EventType) *NodeEvent {
return &NodeEvent{
Type: eventType,
Node: node,
checkpoints: make(map[metrics.ResourceManagementCheckpoint]time.Time, 5),
}
}

func (e *NodeEvent) CreateCheckPointsMap() {
if e.checkpoints == nil {
e.checkpoints = make(map[metrics.ResourceManagementCheckpoint]time.Time, 5)
checkpoints: make([]time.Time, metrics.Len_ResourceManagementCheckpoint),
}
}

Expand All @@ -44,13 +37,12 @@ func (e *NodeEvent) SetCheckpoint(checkpoint metrics.ResourceManagementCheckpoin
return
}

if _, isOK := e.checkpoints[checkpoint]; !isOK {
e.checkpoints[checkpoint] = time.Now().UTC()
} else {
klog.Errorf("Checkpoint %v already set for event %s, node id %s, rv %s", checkpoint, e.Type, e.Node.Id, e.Node.ResourceVersion)
if e.checkpoints == nil {
e.checkpoints = make([]time.Time, metrics.Len_ResourceManagementCheckpoint)
}
e.checkpoints[checkpoint] = time.Now().UTC()
}

func (e *NodeEvent) GetCheckpoints() map[metrics.ResourceManagementCheckpoint]time.Time {
func (e *NodeEvent) GetCheckpoints() []time.Time {
return e.checkpoints
}
59 changes: 35 additions & 24 deletions resource-management/pkg/common-lib/types/event/event_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ var latencyMetricsLock sync.RWMutex

func init() {
latencyMetricsAllCheckpoints = new(LatencyMetricsAllCheckpoints)
latencyMetricsAllCheckpoints.Aggregator_Received = metrics.NewLatencyMetrics(string(metrics.Aggregator_Received))
latencyMetricsAllCheckpoints.Distributor_Received = metrics.NewLatencyMetrics(string(metrics.Distributor_Received))
latencyMetricsAllCheckpoints.Distributor_Sending = metrics.NewLatencyMetrics(string(metrics.Distributor_Sending))
latencyMetricsAllCheckpoints.Distributor_Sent = metrics.NewLatencyMetrics(string(metrics.Distributor_Sent))
latencyMetricsAllCheckpoints.Serializer_Encoded = metrics.NewLatencyMetrics(string(metrics.Serializer_Encoded))
latencyMetricsAllCheckpoints.Serializer_Sent = metrics.NewLatencyMetrics(string(metrics.Serializer_Sent))
latencyMetricsAllCheckpoints.Aggregator_Received = metrics.NewLatencyMetrics(int(metrics.Aggregator_Received))
latencyMetricsAllCheckpoints.Distributor_Received = metrics.NewLatencyMetrics(int(metrics.Distributor_Received))
latencyMetricsAllCheckpoints.Distributor_Sending = metrics.NewLatencyMetrics(int(metrics.Distributor_Sending))
latencyMetricsAllCheckpoints.Distributor_Sent = metrics.NewLatencyMetrics(int(metrics.Distributor_Sent))
latencyMetricsAllCheckpoints.Serializer_Encoded = metrics.NewLatencyMetrics(int(metrics.Serializer_Encoded))
latencyMetricsAllCheckpoints.Serializer_Sent = metrics.NewLatencyMetrics(int(metrics.Serializer_Sent))
}

func AddLatencyMetricsAllCheckpoints(e *NodeEvent) {
if !metrics.ResourceManagementMeasurement_Enabled {
return
}
if e == nil {
klog.Error("Nil event")
}
Expand All @@ -39,45 +42,53 @@ func AddLatencyMetricsAllCheckpoints(e *NodeEvent) {
}
lastUpdatedTime := e.Node.LastUpdatedTime

agg_received_time, isOK1 := checkpointsPerEvent[metrics.Aggregator_Received]
dis_received_time, isOK2 := checkpointsPerEvent[metrics.Distributor_Received]
dis_sending_time, isOK3 := checkpointsPerEvent[metrics.Distributor_Sending]
dis_sent_time, isOK4 := checkpointsPerEvent[metrics.Distributor_Sent]
serializer_encoded_time, isOK5 := checkpointsPerEvent[metrics.Serializer_Encoded]
serializer_sent_time, isOK6 := checkpointsPerEvent[metrics.Serializer_Sent]
agg_received_time := checkpointsPerEvent[metrics.Aggregator_Received]
dis_received_time := checkpointsPerEvent[metrics.Distributor_Received]
dis_sending_time := checkpointsPerEvent[metrics.Distributor_Sending]
dis_sent_time := checkpointsPerEvent[metrics.Distributor_Sent]
serializer_encoded_time := checkpointsPerEvent[metrics.Serializer_Encoded]
serializer_sent_time := checkpointsPerEvent[metrics.Serializer_Sent]

latencyMetricsLock.Lock()
defer latencyMetricsLock.Unlock()
if isOK1 {
if !agg_received_time.IsZero() {
latencyMetricsAllCheckpoints.Aggregator_Received.AddLatencyMetrics(agg_received_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Aggregator_Received)
}
if isOK2 {
if !dis_received_time.IsZero() {
latencyMetricsAllCheckpoints.Distributor_Received.AddLatencyMetrics(dis_received_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Distributor_Received)
}
if isOK3 {
if !dis_sending_time.IsZero() {
latencyMetricsAllCheckpoints.Distributor_Sending.AddLatencyMetrics(dis_sending_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Distributor_Sending)
}
if isOK4 {
if !dis_sent_time.IsZero() {
latencyMetricsAllCheckpoints.Distributor_Sent.AddLatencyMetrics(dis_sent_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Distributor_Sent)
}
if isOK5 {
if !serializer_encoded_time.IsZero() {
latencyMetricsAllCheckpoints.Serializer_Encoded.AddLatencyMetrics(serializer_encoded_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Serializer_Encoded)
}
if isOK6 {
if !serializer_sent_time.IsZero() {
latencyMetricsAllCheckpoints.Serializer_Sent.AddLatencyMetrics(serializer_sent_time.Sub(lastUpdatedTime))
} else {
klog.Errorf("Event (%v, Id %s, RV %s) does not have %s stamped", e.Type, e.Node.Id, e.Node.ResourceVersion, metrics.Serializer_Sent)
}
klog.V(6).Infof("[Metrics][Detail] node %v RV %s: %s: %v, %s: %v, %s: %v, %s: %v, %s: %v, %s: %v",
e.Node.Id, e.Node.ResourceVersion,
metrics.Aggregator_Received_Name, agg_received_time.Sub(lastUpdatedTime),
metrics.Distributor_Received_Name, dis_received_time.Sub(lastUpdatedTime),
metrics.Distributor_Sending_Name, dis_sending_time.Sub(lastUpdatedTime),
metrics.Distributor_Sent_Name, dis_sent_time.Sub(lastUpdatedTime),
metrics.Serializer_Encoded_Name, serializer_encoded_time.Sub(lastUpdatedTime),
metrics.Serializer_Sent_Name, serializer_sent_time.Sub(lastUpdatedTime))
}

func PrintLatencyReport() {
Expand All @@ -91,10 +102,10 @@ func PrintLatencyReport() {

latencyMetricsLock.RUnlock()
metrics_Message := "[Metrics][%s] perc50 %v, perc90 %v, perc99 %v. Total count %v"
klog.Infof(metrics_Message, metrics.Aggregator_Received, agg_received_summary.P50, agg_received_summary.P90, agg_received_summary.P99, agg_received_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Distributor_Received, dis_received_summary.P50, dis_received_summary.P90, dis_received_summary.P99, dis_received_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Distributor_Sending, dis_sending_summary.P50, dis_sending_summary.P90, dis_sending_summary.P99, dis_sending_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Distributor_Sent, dis_sent_summary.P50, dis_sent_summary.P90, dis_sent_summary.P99, dis_sent_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Serializer_Encoded, serializer_encoded_summary.P50, serializer_encoded_summary.P90, serializer_encoded_summary.P99, serializer_encoded_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Serializer_Sent, serializer_sent_summary.P50, serializer_sent_summary.P90, serializer_sent_summary.P99, serializer_sent_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Aggregator_Received_Name, agg_received_summary.P50, agg_received_summary.P90, agg_received_summary.P99, agg_received_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Distributor_Received_Name, dis_received_summary.P50, dis_received_summary.P90, dis_received_summary.P99, dis_received_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Distributor_Sending_Name, dis_sending_summary.P50, dis_sending_summary.P90, dis_sending_summary.P99, dis_sending_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Distributor_Sent_Name, dis_sent_summary.P50, dis_sent_summary.P90, dis_sent_summary.P99, dis_sent_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Serializer_Encoded_Name, serializer_encoded_summary.P50, serializer_encoded_summary.P90, serializer_encoded_summary.P99, serializer_encoded_summary.TotalCount)
klog.Infof(metrics_Message, metrics.Serializer_Sent_Name, serializer_sent_summary.P50, serializer_sent_summary.P90, serializer_sent_summary.P99, serializer_sent_summary.TotalCount)
}
22 changes: 11 additions & 11 deletions resource-management/pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func createNodeStore() *storage.NodeStore {
// TODO: post 630, allocate resources per request for different type of hardware and regions
func (dis *ResourceDistributor) RegisterClient(client *types.Client) error {
clientId := client.ClientId
_, err := dis.allocateNodesToClient(clientId, client.Resource.TotalMachines)
assignedHostNum, err := dis.allocateNodesToClient(clientId, client.Resource.TotalMachines)
if err != nil {
klog.Errorf("Error allocate resource for client. Error %v\n", err)
return err
Expand All @@ -73,27 +73,27 @@ func (dis *ResourceDistributor) RegisterClient(client *types.Client) error {
return err
}

klog.Errorf("Registered client id: %s\n", clientId)
klog.Infof("Registered client id: %s, requested host # = %d, assigned host # = %d\n", clientId, client.Resource.TotalMachines, assignedHostNum)
return nil
}

func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requestedHostNum int) (bool, error) {
func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requestedHostNum int) (int, error) {
dis.allocateLock.Lock()
defer dis.allocateLock.Unlock()
if requestedHostNum <= MinimalRequestHostNum {
return false, types.Error_HostRequestLessThanMiniaml
return 0, types.Error_HostRequestLessThanMiniaml
} else if requestedHostNum > dis.defaultNodeStore.GetTotalHostNum() {
return false, types.Error_HostRequestExceedLimit
return 0, types.Error_HostRequestExceedLimit
} else if !dis.defaultNodeStore.CheckFreeCapacity(requestedHostNum) {
return false, types.Error_HostRequestExceedCapacity
return 0, types.Error_HostRequestExceedCapacity
}

// check client id existence
if _, isOK := dis.nodeEventQueueMap[clientId]; isOK {
return false, types.Error_ClientIdExisted
return 0, types.Error_ClientIdExisted
}
if _, isOK := dis.clientToStores[clientId]; isOK {
return false, types.Error_ClientIdExisted
return 0, types.Error_ClientIdExisted
}

// allocate virtual nodes to client
Expand All @@ -106,7 +106,7 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested
}
}
if len(freeStores) == 0 {
return false, errors.New("No available hosts")
return 0, errors.New("No available hosts")
}

// Get sorted virtual node stores based on ordering criteria
Expand All @@ -123,7 +123,7 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested
}
}
if !hostAssignIsOK {
return false, errors.New("Not enough hosts")
return 0, errors.New("Not enough hosts")
}

// Create event queue for client
Expand All @@ -140,7 +140,7 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested
// persist virtual node assignment
dis.persistVirtualNodesAssignment(clientId, selectedStores)

return true, nil
return assignedHostCount, nil
}

func (dis *ResourceDistributor) addBookmarkEvent(stores []*storage.VirtualNodeStore, eventQueue *cache.NodeEventQueue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) {
hostPerClient: 500,
updateEventNum: 100000,
},
{
name: "Test 1M nodes with 50 clients each has 15000 , each got 100K update events",
nodeNum: 1000000,
clientNum: 50,
hostPerClient: 15000,
updateEventNum: 100000,
},
}

for _, tt := range testCases {
Expand Down Expand Up @@ -97,7 +104,7 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) {
// check each node event
nodeIds := make(map[string]bool)
for _, node := range nodes {
nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)}
nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition(node.GeoInfo.ResourcePartition)}
assert.NotNil(t, nodeLoc)
assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64())
if _, isOK := nodeIds[node.Id]; isOK {
Expand Down Expand Up @@ -282,7 +289,7 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) {
// check each node event
nodeIds := make(map[string]bool)
for _, node := range nodes {
nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)}
nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition(node.GeoInfo.ResourcePartition)}
assert.NotNil(t, nodeLoc)
assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64())
if _, isOK := nodeIds[node.Id]; isOK {
Expand Down Expand Up @@ -419,11 +426,20 @@ Processing 2000 AddNode events took 4.219236ms.
Processing 20000 AddNode events took 28.11632ms.
Processing 200000 AddNode events took 294.131531ms.
Processing 2000000 AddNode events took 4.040301206s.
. Added checkpoints with array
Processing 20 AddNode events took 314.825µs.
Processing 200 AddNode events took 982.61µs.
Processing 2000 AddNode events took 6.241914ms.
Processing 20000 AddNode events took 57.767247ms.
Processing 200000 AddNode events took 583.935804ms.
Processing 2000000 AddNode events took 6.429718129s.
*/
func TestProcessEvents_TwoRPs_AddNodes_Sequential(t *testing.T) {
distributor := setUp()
defer tearDown()

//metrics.ResourceManagementMeasurement_Enabled = false
nodeCounts := []int{10, 100, 1000, 10000, 100000, 1000000}
// generate add node events
for i := 0; i < len(nodeCounts); i++ {
Expand Down Expand Up @@ -493,6 +509,14 @@ Processing 2000 AddNode events took 2.68838ms.
Processing 20000 AddNode events took 18.140626ms.
Processing 200000 AddNode events took 175.578763ms.
Processing 2000000 AddNode events took 2.460715575s.
. Added checkpoints with array
Processing 20 AddNode events took 288.991µs.
Processing 200 AddNode events took 629.264µs.
Processing 2000 AddNode events took 3.910318ms.
Processing 20000 AddNode events took 32.042223ms.
Processing 200000 AddNode events took 365.70946ms.
Processing 2000000 AddNode events took 4.189824513s.
*/
func TestProcessEvents_TwoRPs_Concurrent(t *testing.T) {
distributor := setUp()
Expand Down
Loading

0 comments on commit 526147a

Please sign in to comment.