Skip to content

Commit

Permalink
fix issue #79 and #80
Browse files Browse the repository at this point in the history
  • Loading branch information
yb01 committed Jul 13, 2022
1 parent e0e95d1 commit 3505b3a
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 9 deletions.
4 changes: 1 addition & 3 deletions resource-management/pkg/aggregrator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func (a *Aggregator) Run() (err error) {

klog.V(3).Infof("Starting loop pulling nodes from region: %v", a.urls[i])
for {
klog.V(9).Infof("Wait for 100 milisecond...")
time.Sleep(100 * time.Millisecond)

// Call the Pull methods
Expand Down Expand Up @@ -149,8 +148,6 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64
path = httpPrefix + c.BaseURL + "/resources/subsequentpull"
}

klog.V(9).Infof("CRV : (%v)", crv)

bytes, _ := json.Marshal(PullDataFromRRM{BatchLength: batchLength, CRV: crv.Copy()})
req, err := http.NewRequest(http.MethodGet, path, strings.NewReader((string(bytes))))
if err != nil {
Expand Down Expand Up @@ -182,6 +179,7 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64
for i := 0; i < len(ResponseObject.RegionNodeEvents); i++ {
for j := 0; j < len(ResponseObject.RegionNodeEvents[i]); j++ {
if ResponseObject.RegionNodeEvents[i][j] != nil {
ResponseObject.RegionNodeEvents[i][j].CreatCheckPointsMap()
ResponseObject.RegionNodeEvents[i][j].SetCheckpoint(metrics.Aggregator_Received)
}
}
Expand Down
15 changes: 10 additions & 5 deletions resource-management/pkg/common-lib/types/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,23 @@ type NodeEvent struct {

func NewNodeEvent(node *types.LogicalNode, eventType EventType) *NodeEvent {
return &NodeEvent{
Type: eventType,
Node: node,
Type: eventType,
Node: node,
checkpoints: make(map[metrics.ResourceManagementCheckpoint]time.Time, 5),
}
}

func (e *NodeEvent) CreatCheckPointsMap() {
if e.checkpoints == nil {
e.checkpoints = make(map[metrics.ResourceManagementCheckpoint]time.Time, 5)
}
}

func (e *NodeEvent) SetCheckpoint(checkpoint metrics.ResourceManagementCheckpoint) {
if !metrics.ResourceManagementMeasurement_Enabled {
return
}
if e.checkpoints == nil {
e.checkpoints = make(map[metrics.ResourceManagementCheckpoint]time.Time, 5)
}

if _, isOK := e.checkpoints[checkpoint]; !isOK {
e.checkpoints[checkpoint] = time.Now().UTC()
} else {
Expand Down
2 changes: 2 additions & 0 deletions resource-management/pkg/distributor/cache/eventqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,11 @@ func (eq *NodeEventQueue) Watch(rvs types.InternalResourceVersionMap, clientWatc
if !ok {
break
}
klog.V(9).Infof("Sending event with node id %v", event.Node.Id)
event.SetCheckpoint(metrics.Distributor_Sending)
downstreamCh <- event
event.SetCheckpoint(metrics.Distributor_Sent)
klog.V(9).Infof("Event with node id %v sent", event.Node.Id)
}
}

Expand Down
2 changes: 1 addition & 1 deletion resource-management/pkg/service-api/endpoints/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (i *Installer) serverWatch(resp http.ResponseWriter, req *http.Request, cli
return
}

klog.V(9).Infof("Getting event from distributor: %v, %v", *record, *record.Node)
klog.V(9).Infof("Getting event from distributor, node Id: %v", record.Node.Id)

if err := json.NewEncoder(resp).Encode(*record); err != nil {
klog.V(3).Infof("encoding record failed. error %v", err)
Expand Down

0 comments on commit 3505b3a

Please sign in to comment.