diff --git a/resource-management/pkg/aggregrator/aggregator.go b/resource-management/pkg/aggregrator/aggregator.go index 2345590f..4211259c 100644 --- a/resource-management/pkg/aggregrator/aggregator.go +++ b/resource-management/pkg/aggregrator/aggregator.go @@ -179,6 +179,7 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64 for i := 0; i < len(ResponseObject.RegionNodeEvents); i++ { for j := 0; j < len(ResponseObject.RegionNodeEvents[i]); j++ { if ResponseObject.RegionNodeEvents[i][j] != nil { + ResponseObject.RegionNodeEvents[i][j].CreateCheckPointsMap() ResponseObject.RegionNodeEvents[i][j].SetCheckpoint(metrics.Aggregator_Received) } } diff --git a/resource-management/pkg/common-lib/types/event/event.go b/resource-management/pkg/common-lib/types/event/event.go index 6ba779c2..ecced6e3 100644 --- a/resource-management/pkg/common-lib/types/event/event.go +++ b/resource-management/pkg/common-lib/types/event/event.go @@ -27,8 +27,15 @@ type NodeEvent struct { func NewNodeEvent(node *types.LogicalNode, eventType EventType) *NodeEvent { return &NodeEvent{ - Type: eventType, - Node: node, + Type: eventType, + Node: node, + checkpoints: make(map[metrics.ResourceManagementCheckpoint]time.Time, 5), + } +} + +func (e *NodeEvent) CreateCheckPointsMap() { + if e.checkpoints == nil { + e.checkpoints = make(map[metrics.ResourceManagementCheckpoint]time.Time, 5) } } @@ -36,9 +43,7 @@ func (e *NodeEvent) SetCheckpoint(checkpoint metrics.ResourceManagementCheckpoin if !metrics.ResourceManagementMeasurement_Enabled { return } - if e.checkpoints == nil { - e.checkpoints = make(map[metrics.ResourceManagementCheckpoint]time.Time, 5) - } + if _, isOK := e.checkpoints[checkpoint]; !isOK { e.checkpoints[checkpoint] = time.Now().UTC() } else { diff --git a/resource-management/pkg/distributor/cache/eventqueue.go b/resource-management/pkg/distributor/cache/eventqueue.go index ad5b61a1..f8f8655c 100644 --- a/resource-management/pkg/distributor/cache/eventqueue.go +++ b/resource-management/pkg/distributor/cache/eventqueue.go @@ -173,9 +173,11 @@ func (eq *NodeEventQueue) Watch(rvs types.InternalResourceVersionMap, clientWatc if !ok { break } + klog.V(9).Infof("Sending event with node id %v", event.Node.Id) event.SetCheckpoint(metrics.Distributor_Sending) downstreamCh <- event event.SetCheckpoint(metrics.Distributor_Sent) + klog.V(9).Infof("Event with node id %v sent", event.Node.Id) } } diff --git a/resource-management/pkg/service-api/endpoints/installer.go b/resource-management/pkg/service-api/endpoints/installer.go index 281e8ffe..0ae0bf35 100644 --- a/resource-management/pkg/service-api/endpoints/installer.go +++ b/resource-management/pkg/service-api/endpoints/installer.go @@ -220,7 +220,7 @@ func (i *Installer) serverWatch(resp http.ResponseWriter, req *http.Request, cli return } - klog.V(9).Infof("Getting event from distributor: %v, %v", *record, *record.Node) + klog.V(9).Infof("Getting event from distributor, node Id: %v", record.Node.Id) if err := json.NewEncoder(resp).Encode(*record); err != nil { klog.V(3).Infof("encoding record failed. error %v", err)