diff --git a/resource-management/pkg/aggregrator/aggregator.go b/resource-management/pkg/aggregrator/aggregator.go index af9f21a8..b229bc43 100644 --- a/resource-management/pkg/aggregrator/aggregator.go +++ b/resource-management/pkg/aggregrator/aggregator.go @@ -32,7 +32,7 @@ type ClientOfRRM struct { // type ResponseFromRRM struct { RegionNodeEvents [][]*event.NodeEvent - RvMap types.ResourceVersionMap + RvMap types.TransitResourceVersionMap Length uint64 } @@ -41,7 +41,7 @@ type ResponseFromRRM struct { type PullDataFromRRM struct { BatchLength uint64 DefaultCRV uint64 - CRV types.ResourceVersionMap + CRV types.TransitResourceVersionMap } const ( @@ -76,7 +76,7 @@ func (a *Aggregator) Run() (err error) { klog.V(3).Infof("Existing goroutine for region: %v", a.urls[i]) }() - var crv types.ResourceVersionMap + var crv types.TransitResourceVersionMap var regionNodeEvents [][]*event.NodeEvent var length uint64 var eventProcess bool @@ -138,7 +138,7 @@ func (a *Aggregator) createClient(url string) *ClientOfRRM { // or // Call the resource region manager's SubsequentPull method {url}/resources/subsequentpull when crv is not nil // -func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64, crv types.ResourceVersionMap) ([][]*event.NodeEvent, uint64) { +func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64, crv types.TransitResourceVersionMap) ([][]*event.NodeEvent, uint64) { var path string if len(crv) == 0 { @@ -183,7 +183,7 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64 // Call resource region manager's POST method {url}/resources/crv to update the CRV // error indicate failed POST, CRV means Composite Resource Version // -func (a *Aggregator) postCRV(c *ClientOfRRM, crv types.ResourceVersionMap) error { +func (a *Aggregator) postCRV(c *ClientOfRRM, crv types.TransitResourceVersionMap) error { path := httpPrefix + c.BaseURL + "/resources/crv" bytes, _ := json.Marshal(PullDataFromRRM{CRV: crv.Copy()}) req, err := http.NewRequest(http.MethodPost, path, strings.NewReader((string(bytes)))) diff --git a/resource-management/pkg/common-lib/interfaces/distributor/interfaces.go b/resource-management/pkg/common-lib/interfaces/distributor/interfaces.go index 4698e48a..d0527404 100644 --- a/resource-management/pkg/common-lib/interfaces/distributor/interfaces.go +++ b/resource-management/pkg/common-lib/interfaces/distributor/interfaces.go @@ -8,7 +8,7 @@ import ( type Interface interface { RegisterClient(*types.Client) error - ListNodesForClient(clientId string) ([]*types.LogicalNode, types.ResourceVersionMap, error) - Watch(clientId string, rvs types.ResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error - ProcessEvents(events []*event.NodeEvent) (bool, types.ResourceVersionMap) + ListNodesForClient(clientId string) ([]*types.LogicalNode, types.TransitResourceVersionMap, error) + Watch(clientId string, rvs types.TransitResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error + ProcessEvents(events []*event.NodeEvent) (bool, types.TransitResourceVersionMap) } diff --git a/resource-management/pkg/common-lib/interfaces/store/interface.go b/resource-management/pkg/common-lib/interfaces/store/interface.go index 5b204f9c..ba55ea65 100644 --- a/resource-management/pkg/common-lib/interfaces/store/interface.go +++ b/resource-management/pkg/common-lib/interfaces/store/interface.go @@ -36,7 +36,7 @@ type NodeStoreStatus struct { VirtualNodeNumPerRP int // Latest resource version map - CurrentResourceVerions types.ResourceVersionMap + CurrentResourceVerions types.TransitResourceVersionMap } func (nsStatus *NodeStoreStatus) GetKey() string { diff --git a/resource-management/pkg/common-lib/types/compositeresourceversion.go b/resource-management/pkg/common-lib/types/compositeresourceversion.go index 7e81133e..b9acce4c 100644 --- a/resource-management/pkg/common-lib/types/compositeresourceversion.go +++ b/resource-management/pkg/common-lib/types/compositeresourceversion.go @@ -1,6 +1,8 @@ package types import ( + "encoding/json" + "global-resource-service/resource-management/pkg/common-lib/types/location" ) @@ -10,11 +12,40 @@ type CompositeResourceVersion struct { ResourceVersion uint64 } +type RvLocation struct { + Region location.Region + Partition location.ResourcePartition +} + +func (loc RvLocation) MarshalText() (text []byte, err error) { + type l RvLocation + return json.Marshal(l(loc)) +} + +func (loc *RvLocation) UnmarshalText(text []byte) error { + type l RvLocation + return json.Unmarshal(text, (*l)(loc)) +} + // Map from (regionId, ResourcePartitionId) to resourceVersion -type ResourceVersionMap map[location.Location]uint64 +// used in REST API calls +type TransitResourceVersionMap map[RvLocation]uint64 + +// internally used in the eventqueue used in WATCH of nodes +type InternalResourceVersionMap map[location.Location]uint64 + +func ConvertToInternalResourceVersionMap(rvs TransitResourceVersionMap) InternalResourceVersionMap { + internalMap := make(InternalResourceVersionMap) + + for k, v := range rvs { + internalMap[*location.NewLocation(k.Region, k.Partition)] = v + } + + return internalMap +} -func (rvs *ResourceVersionMap) Copy() ResourceVersionMap { - dupRVs := make(ResourceVersionMap, len(*rvs)) +func (rvs *TransitResourceVersionMap) Copy() TransitResourceVersionMap { + dupRVs := make(TransitResourceVersionMap, len(*rvs)) for loc, rv := range *rvs { dupRVs[loc] = rv } diff --git a/resource-management/pkg/common-lib/types/compositeresourceversion_test.go b/resource-management/pkg/common-lib/types/compositeresourceversion_test.go index 5953090b..3b6d25cc 100644 --- a/resource-management/pkg/common-lib/types/compositeresourceversion_test.go +++ b/resource-management/pkg/common-lib/types/compositeresourceversion_test.go @@ -8,9 +8,9 @@ import ( ) func TestResourceVersionMap_Marshall_UnMarshall(t *testing.T) { - rvs := make(ResourceVersionMap) - loc := location.NewLocation(location.Beijing, location.ResourcePartition1) - rvs[*loc] = 100 + rvs := make(TransitResourceVersionMap) + loc := RvLocation{Region: location.Beijing, Partition: location.ResourcePartition1} + rvs[loc] = 100 // marshall b, err := json.Marshal(rvs) @@ -18,10 +18,10 @@ func TestResourceVersionMap_Marshall_UnMarshall(t *testing.T) { assert.NotNil(t, b) // unmarshall - var newRVMap ResourceVersionMap + var newRVMap TransitResourceVersionMap err = json.Unmarshal(b, &newRVMap) assert.Nil(t, err) assert.NotNil(t, newRVMap) assert.Equal(t, 1, len(newRVMap)) - assert.Equal(t, uint64(100), newRVMap[*loc]) + assert.Equal(t, uint64(100), newRVMap[loc]) } diff --git a/resource-management/pkg/common-lib/types/location/location.go b/resource-management/pkg/common-lib/types/location/location.go index 56e5e6dc..41e05246 100644 --- a/resource-management/pkg/common-lib/types/location/location.go +++ b/resource-management/pkg/common-lib/types/location/location.go @@ -1,7 +1,6 @@ package location import ( - "encoding/json" "fmt" ) @@ -237,7 +236,7 @@ func GetRPNum() int { return len(ResourcePartitions) } -// TODO - read resource parition from configuration or metadata server +// TODO - read resource partition from configuration or metadata server func GetRPsForRegion(region Region) []ResourcePartition { rpsForRegion := make([]ResourcePartition, len(ResourcePartitions)) for i := 0; i < len(ResourcePartitions); i++ { @@ -266,13 +265,3 @@ func (loc *Location) Equal(locToCompare Location) bool { func (loc *Location) String() string { return fmt.Sprintf("[Region %s, ResoucePartition %s]", loc.region, loc.partition) } - -func (loc Location) MarshalText() (text []byte, err error) { - type l Location - return json.Marshal(l(loc)) -} - -func (loc *Location) UnmarshalText(text []byte) error { - type l Location - return json.Unmarshal(text, (*l)(loc)) -} diff --git a/resource-management/pkg/distributor/cache/eventqueue.go b/resource-management/pkg/distributor/cache/eventqueue.go index ba4b5ab3..06a28b57 100644 --- a/resource-management/pkg/distributor/cache/eventqueue.go +++ b/resource-management/pkg/distributor/cache/eventqueue.go @@ -140,9 +140,9 @@ func (eq *NodeEventQueue) EnqueueEvent(e *node.ManagedNodeEvent) { queueByLoc.enqueueEvent(e) } -func (eq *NodeEventQueue) Watch(rvs types.ResourceVersionMap, clientWatchChan chan *event.NodeEvent, stopCh chan struct{}) error { +func (eq *NodeEventQueue) Watch(rvs types.InternalResourceVersionMap, clientWatchChan chan *event.NodeEvent, stopCh chan struct{}) error { if eq.watchChan != nil { - return errors.New("Currently only support one watcher per node event queue.") + return errors.New("currently only support one watcher per node event queue") } // get events already in queues @@ -182,7 +182,7 @@ func (eq *NodeEventQueue) Watch(rvs types.ResourceVersionMap, clientWatchChan ch return nil } -func (eq *NodeEventQueue) getAllEventsSinceResourceVersion(rvs types.ResourceVersionMap) ([]*event.NodeEvent, error) { +func (eq *NodeEventQueue) getAllEventsSinceResourceVersion(rvs types.InternalResourceVersionMap) ([]*event.NodeEvent, error) { locStartPostitions := make(map[location.Location]int) for loc, rv := range rvs { diff --git a/resource-management/pkg/distributor/distributor.go b/resource-management/pkg/distributor/distributor.go index d34d5ec4..32e82de0 100644 --- a/resource-management/pkg/distributor/distributor.go +++ b/resource-management/pkg/distributor/distributor.go @@ -170,7 +170,9 @@ func (dis *ResourceDistributor) getSortedVirtualStores(stores map[*storage.Virtu return sortedStores } -func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.LogicalNode, types.ResourceVersionMap, error) { +// ListNodesForClient returns list of nodes for a client request and a RV map to the client for WATCH +// or error encountered during the node allocation/listing for the client +func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.LogicalNode, types.TransitResourceVersionMap, error) { if clientId == "" { return nil, nil, errors.New("Empty clientId") } @@ -187,7 +189,7 @@ func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.Lo eventQueue.AcquireSnapshotRLock() nodesByStore := make([][]*types.LogicalNode, len(assignedStores)) - rvMapByStore := make([]types.ResourceVersionMap, len(assignedStores)) + rvMapByStore := make([]types.TransitResourceVersionMap, len(assignedStores)) hostCount := 0 for i := 0; i < len(assignedStores); i++ { nodesByStore[i], rvMapByStore[i] = assignedStores[i].SnapShot() @@ -206,7 +208,7 @@ func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.Lo } // combine to single ResourceVersionMap - finalRVs := make(types.ResourceVersionMap) + finalRVs := make(types.TransitResourceVersionMap) for i := 0; i < len(rvMapByStore); i++ { currentRVs := rvMapByStore[i] for loc, rv := range currentRVs { @@ -223,7 +225,7 @@ func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.Lo return nodes, finalRVs, nil } -func (dis *ResourceDistributor) Watch(clientId string, rvs types.ResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error { +func (dis *ResourceDistributor) Watch(clientId string, rvs types.TransitResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error { var nodeEventQueue *cache.NodeEventQueue var isOK bool if nodeEventQueue, isOK = dis.nodeEventQueueMap[clientId]; !isOK || nodeEventQueue == nil { @@ -239,10 +241,12 @@ func (dis *ResourceDistributor) Watch(clientId string, rvs types.ResourceVersion return errors.New("Stop watch channel not provided") } - return nodeEventQueue.Watch(rvs, watchChan, stopCh) + internal_rvs := types.ConvertToInternalResourceVersionMap(rvs) + + return nodeEventQueue.Watch(internal_rvs, watchChan, stopCh) } -func (dis *ResourceDistributor) ProcessEvents(events []*event.NodeEvent) (bool, types.ResourceVersionMap) { +func (dis *ResourceDistributor) ProcessEvents(events []*event.NodeEvent) (bool, types.TransitResourceVersionMap) { eventsToProcess := make([]*node.ManagedNodeEvent, len(events)) for i := 0; i < len(events); i++ { if events[i] != nil { diff --git a/resource-management/pkg/distributor/distributor_concurrency_test.go b/resource-management/pkg/distributor/distributor_concurrency_test.go index 376ea1bf..c9a82d94 100644 --- a/resource-management/pkg/distributor/distributor_concurrency_test.go +++ b/resource-management/pkg/distributor/distributor_concurrency_test.go @@ -78,7 +78,7 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) { } // client list nodes - latestRVsByClient := make([]types.ResourceVersionMap, tt.clientNum) + latestRVsByClient := make([]types.TransitResourceVersionMap, tt.clientNum) nodesByClient := make([][]*types.LogicalNode, tt.clientNum) for i := 0; i < tt.clientNum; i++ { clientId := clientIds[i] @@ -97,9 +97,9 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) { // check each node event nodeIds := make(map[string]bool) for _, node := range nodes { - nodeLoc := location.NewLocation(location.Region(node.GeoInfo.Region), location.ResourcePartition(node.GeoInfo.ResourcePartition)) + nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)} assert.NotNil(t, nodeLoc) - assert.True(t, latestRVs[*nodeLoc] >= node.GetResourceVersionInt64()) + assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64()) if _, isOK := nodeIds[node.Id]; isOK { assert.Fail(t, "List nodes cannot have more than one copy of a node") } else { @@ -262,7 +262,7 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) { duration += time.Since(start) // client list nodes - latestRVsByClient := make([]types.ResourceVersionMap, tt.clientNum) + latestRVsByClient := make([]types.TransitResourceVersionMap, tt.clientNum) nodesByClient := make([][]*types.LogicalNode, tt.clientNum) wg.Add(tt.clientNum) @@ -282,9 +282,9 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) { // check each node event nodeIds := make(map[string]bool) for _, node := range nodes { - nodeLoc := location.NewLocation(location.Region(node.GeoInfo.Region), location.ResourcePartition(node.GeoInfo.ResourcePartition)) + nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)} assert.NotNil(t, nodeLoc) - assert.True(t, latestRVs[*nodeLoc] >= node.GetResourceVersionInt64()) + assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64()) if _, isOK := nodeIds[node.Id]; isOK { assert.Fail(t, "List nodes cannot have more than one copy of a node") } else { diff --git a/resource-management/pkg/distributor/distributor_test.go b/resource-management/pkg/distributor/distributor_test.go index 19e49821..72906e67 100644 --- a/resource-management/pkg/distributor/distributor_test.go +++ b/resource-management/pkg/distributor/distributor_test.go @@ -339,9 +339,9 @@ func TestRegistrationWorkflow(t *testing.T) { // check each node event nodeIds := make(map[string]bool) for _, node := range nodes { - nodeLoc := location.NewLocation(location.Region(node.GeoInfo.Region), location.ResourcePartition(node.GeoInfo.ResourcePartition)) + nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)} assert.NotNil(t, nodeLoc) - assert.True(t, latestRVs[*nodeLoc] >= node.GetResourceVersionInt64()) + assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64()) if _, isOK := nodeIds[node.Id]; isOK { assert.Fail(t, "List nodes cannot have more than one copy of a node") } else { @@ -355,8 +355,9 @@ func TestRegistrationWorkflow(t *testing.T) { updateNodeEvents := generatedUpdateNodeEventsFromNodeList(nodes) result2, rvMap2 := distributor.ProcessEvents(updateNodeEvents) assert.True(t, result2, "Expecting update nodes successfully") - loc := location.NewLocation(location.Region(nodes[0].GeoInfo.Region), location.ResourcePartition(nodes[0].GeoInfo.ResourcePartition)) - assert.Equal(t, uint64(rvToGenerate), rvMap2[*loc]) + loc := types.RvLocation{Region: location.Region(nodes[0].GeoInfo.Region), Partition: location.ResourcePartition (nodes[0].GeoInfo.ResourcePartition)} + + assert.Equal(t, uint64(rvToGenerate), rvMap2[loc]) assert.Equal(t, oldNodeRV, nodes[0].GetResourceVersionInt64(), "Expecting listed nodes are snapshoted and cannot be affected by update") // client watch node update @@ -370,7 +371,8 @@ func TestRegistrationWorkflow(t *testing.T) { watchedEventCount := 0 for e := range watchCh { assert.Equal(t, event.Modified, e.Type) - nodeLoc := location.NewLocation(location.Region(e.Node.GeoInfo.Region), location.ResourcePartition(e.Node.GeoInfo.ResourcePartition)) + nodeLoc := types.RvLocation{Region: location.Region(e.Node.GeoInfo.Region), Partition: location.ResourcePartition (e.Node.GeoInfo.ResourcePartition)} + assert.Equal(t, loc, nodeLoc) watchedEventCount++ diff --git a/resource-management/pkg/distributor/node/managedNodeEvent.go b/resource-management/pkg/distributor/node/managedNodeEvent.go index 3e9f7726..d27c2a9b 100644 --- a/resource-management/pkg/distributor/node/managedNodeEvent.go +++ b/resource-management/pkg/distributor/node/managedNodeEvent.go @@ -41,6 +41,11 @@ func (n *ManagedNodeEvent) GetLocation() *location.Location { return n.loc } +func (n *ManagedNodeEvent) GetRvLocation() *types.RvLocation { + + return &types.RvLocation{Region: n.loc.GetRegion(), Partition: n.loc.GetResourcePartition()} +} + func (n *ManagedNodeEvent) GetResourceVersion() uint64 { rv, err := strconv.ParseUint(n.nodeEvent.Node.ResourceVersion, 10, 64) if err != nil { diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index 8b586770..a605a5e8 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -67,21 +67,23 @@ func (vs *VirtualNodeStore) GetRange() (float64, float64) { return vs.lowerbound, vs.upperbound } -func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.ResourceVersionMap) { +// Snapshot generates a list of node for the List() call from a client, and a current RV map to client +func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.TransitResourceVersionMap) { vs.mu.RLock() defer vs.mu.RUnlock() nodesCopy := make([]*types.LogicalNode, len(vs.nodeEventByHash)) index := 0 - rvs := make(types.ResourceVersionMap) + rvs := make(types.TransitResourceVersionMap) for _, node := range vs.nodeEventByHash { nodesCopy[index] = node.CopyNode() newRV := node.GetResourceVersion() - if lastRV, isOK := rvs[*node.GetLocation()]; isOK { + rvLoc := *node.GetRvLocation() + if lastRV, isOK := rvs[rvLoc]; isOK { if lastRV < newRV { - rvs[*node.GetLocation()] = newRV + rvs[rvLoc] = newRV } } else { - rvs[*node.GetLocation()] = newRV + rvs[rvLoc] = newRV } index++ } @@ -156,14 +158,14 @@ func NewNodeStore(vNodeNumPerRP int, regionNum int, partitionMaxNum int) *NodeSt } // TODO - verify whether the original value can be changed. If so, return a deepcopy -func (ns *NodeStore) GetCurrentResourceVersions() types.ResourceVersionMap { +func (ns *NodeStore) GetCurrentResourceVersions() types.TransitResourceVersionMap { ns.rvLock.RLock() defer ns.rvLock.RUnlock() - rvMap := make(types.ResourceVersionMap) + rvMap := make(types.TransitResourceVersionMap) for i := 0; i < ns.regionNum; i++ { for j := 0; j < ns.partitionMaxNum; j++ { if ns.currentRVs[i][j] > 0 { - rvMap[*location.NewLocation(location.Regions[i], location.ResourcePartitions[j])] = ns.currentRVs[i][j] + rvMap[types.RvLocation{Region: location.Regions[i], Partition: location.ResourcePartitions[j]}] = ns.currentRVs[i][j] } } } @@ -245,7 +247,7 @@ func (ns *NodeStore) UpdateNode(nodeEvent *node.ManagedNodeEvent) { func (ns NodeStore) DeleteNode(nodeEvent event.NodeEvent) { } -func (ns *NodeStore) ProcessNodeEvents(nodeEvents []*node.ManagedNodeEvent, persistHelper *DistributorPersistHelper) (bool, types.ResourceVersionMap) { +func (ns *NodeStore) ProcessNodeEvents(nodeEvents []*node.ManagedNodeEvent, persistHelper *DistributorPersistHelper) (bool, types.TransitResourceVersionMap) { persistHelper.SetWaitCount(len(nodeEvents)) for _, e := range nodeEvents { if e == nil { diff --git a/resource-management/pkg/service-api/endpoints/installer.go b/resource-management/pkg/service-api/endpoints/installer.go index ff9b89e8..2fa30ef9 100644 --- a/resource-management/pkg/service-api/endpoints/installer.go +++ b/resource-management/pkg/service-api/endpoints/installer.go @@ -248,7 +248,7 @@ func getClinetId(req *http.Request) string { return clientId } -func getResourceVersionsMap(req *http.Request) (types.ResourceVersionMap, error) { +func getResourceVersionsMap(req *http.Request) (types.TransitResourceVersionMap, error) { body, err := ioutil.ReadAll(req.Body) if err != nil { @@ -267,7 +267,7 @@ func getResourceVersionsMap(req *http.Request) (types.ResourceVersionMap, error) return wr.ResourceVersions, nil } -func (i *Installer) handleResponseTrunked(resp http.ResponseWriter, nodes []*types.LogicalNode, crv types.ResourceVersionMap, chunkSize int) { +func (i *Installer) handleResponseTrunked(resp http.ResponseWriter, nodes []*types.LogicalNode, crv types.TransitResourceVersionMap, chunkSize int) { responseTrunkSize := DefaultResponseTrunkSize if responseTrunkSize < chunkSize { responseTrunkSize = chunkSize diff --git a/resource-management/pkg/service-api/types/types.go b/resource-management/pkg/service-api/types/types.go index 1db19d2a..2e37e6a9 100644 --- a/resource-management/pkg/service-api/types/types.go +++ b/resource-management/pkg/service-api/types/types.go @@ -5,7 +5,7 @@ import "global-resource-service/resource-management/pkg/common-lib/types" // WatchRequest is the request body of the Watch API call // ResourceVersionMap is part of the return of the LIST API call type WatchRequest struct { - ResourceVersions types.ResourceVersionMap `json:"resource_versions"` + ResourceVersions types.TransitResourceVersionMap `json:"resource_versions"` } // ClientRegistrationRequest is the request body when a client register to the resource management service @@ -28,5 +28,5 @@ type ClientRegistrationResponse struct { // ResourceVersions are the list of RVs from each RP type ListNodeResponse struct { NodeList []*types.LogicalNode `json:"node_list",omitempty` - ResourceVersions types.ResourceVersionMap `json:"resource_version_map,omitempty"` + ResourceVersions types.TransitResourceVersionMap `json:"resource_version_map,omitempty"` } diff --git a/resource-management/pkg/store/redis/redis_test.go b/resource-management/pkg/store/redis/redis_test.go index c39d3286..0def4a59 100644 --- a/resource-management/pkg/store/redis/redis_test.go +++ b/resource-management/pkg/store/redis/redis_test.go @@ -84,7 +84,7 @@ func TestPersistNodes(t *testing.T) { // Simply Test Persist Node Store Status // func TestPersistNodeStoreStatus(t *testing.T) { - var CRV = make(types.ResourceVersionMap, 1) + var CRV = make(types.TransitResourceVersionMap, 1) testLocation := location.NewLocation(location.Beijing, location.ResourcePartition1) CRV[*testLocation] = 1000 diff --git a/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go b/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go index ed3a443b..0e883d49 100644 --- a/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go +++ b/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go @@ -82,7 +82,7 @@ func GetRegionNodeAddedEvents(batchLength uint64) (simulatorTypes.RegionNodeEven // Return region node modified events with CRVs in BATCH LENGTH from all RPs // TO DO: paginate support // -func GetRegionNodeModifiedEventsCRV(rvs types.ResourceVersionMap) (simulatorTypes.RegionNodeEvents, uint64) { +func GetRegionNodeModifiedEventsCRV(rvs types.TransitResourceVersionMap) (simulatorTypes.RegionNodeEvents, uint64) { snapshotNodeListEvents = RegionNodeEventsList pulledNodeListEvents := make(simulatorTypes.RegionNodeEvents, RpNum) @@ -93,9 +93,9 @@ func GetRegionNodeModifiedEventsCRV(rvs types.ResourceVersionMap) (simulatorType for i := 0; i < NodesPerRP; i++ { region := snapshotNodeListEvents[j][i].Node.GeoInfo.Region rp := snapshotNodeListEvents[j][i].Node.GeoInfo.ResourcePartition - loc := location.NewLocation(location.Region(region), location.ResourcePartition(rp)) + loc := types.RvLocation{Region: location.Region(region), Partition: location.ResourcePartition(rp)} - if snapshotNodeListEvents[j][i].Node.GetResourceVersionInt64() > rvs[*loc] { + if snapshotNodeListEvents[j][i].Node.GetResourceVersionInt64() > rvs[loc] { count += 1 pulledNodeListEventsPerRP[indexPerRP] = snapshotNodeListEvents[j][i] indexPerRP += 1 diff --git a/resource-management/test/resourceRegionMgrSimulator/types/types.go b/resource-management/test/resourceRegionMgrSimulator/types/types.go index 64087bf2..915b8754 100644 --- a/resource-management/test/resourceRegionMgrSimulator/types/types.go +++ b/resource-management/test/resourceRegionMgrSimulator/types/types.go @@ -16,7 +16,7 @@ type PostCRVstatus bool // type ResponseFromRRM struct { RegionNodeEvents [][]*event.NodeEvent - RvMap types.ResourceVersionMap + RvMap types.TransitResourceVersionMap Length uint64 } @@ -25,7 +25,7 @@ type ResponseFromRRM struct { type PullDataFromRRM struct { BatchLength uint64 DefaultCRV uint64 - CRV types.ResourceVersionMap + CRV types.TransitResourceVersionMap } // ToJSON serializes the contents of the collection to JSON