From 0867cbc2032d2761b585fdcf4e601578ef877c9c Mon Sep 17 00:00:00 2001 From: Yunwen Bai Date: Wed, 29 Jun 2022 23:27:04 -0700 Subject: [PATCH 1/4] add new type for CRV in transit --- go.mod | 5 --- .../types/compositeresourceversion.go | 33 ++++++++++++++++++- .../types/compositeresourceversion_test.go | 6 ++-- .../pkg/common-lib/types/location/location.go | 13 +------- .../pkg/distributor/cache/eventqueue.go | 6 ++-- .../pkg/distributor/distributor.go | 6 +++- .../distributor_concurrency_test.go | 8 ++--- .../pkg/distributor/distributor_test.go | 12 ++++--- .../pkg/distributor/node/managedNodeEvent.go | 5 +++ .../pkg/distributor/storage/nodestore.go | 9 ++--- .../data/regionNodeEvents.go | 4 +-- 11 files changed, 67 insertions(+), 40 deletions(-) diff --git a/go.mod b/go.mod index 92e72e16..814638db 100644 --- a/go.mod +++ b/go.mod @@ -11,13 +11,8 @@ require ( ) require ( - github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/go-logr/logr v1.2.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/resource-management/pkg/common-lib/types/compositeresourceversion.go b/resource-management/pkg/common-lib/types/compositeresourceversion.go index 7e81133e..4faf3157 100644 --- a/resource-management/pkg/common-lib/types/compositeresourceversion.go +++ b/resource-management/pkg/common-lib/types/compositeresourceversion.go @@ -1,6 +1,8 @@ package types import ( + "encoding/json" + "global-resource-service/resource-management/pkg/common-lib/types/location" ) @@ -10,8 +12,37 @@ type CompositeResourceVersion struct { ResourceVersion uint64 } +type RvLocation struct { + Region location.Region + Partition location.ResourcePartition +} + +func (loc RvLocation) MarshalText() (text []byte, err error) { + type l RvLocation + return json.Marshal(l(loc)) +} + +func (loc *RvLocation) UnmarshalText(text []byte) error { + type l RvLocation + return json.Unmarshal(text, (*l)(loc)) +} + // Map from (regionId, ResourcePartitionId) to resourceVersion -type ResourceVersionMap map[location.Location]uint64 +// used in REST API calls +type ResourceVersionMap map[RvLocation]uint64 + +// internally used in the eventqueue used in WATCH of nodes +type InternalResourceVersionMap map[location.Location]uint64 + +func ConvertToInternalResourceVersionMap(rvs ResourceVersionMap) InternalResourceVersionMap { + internalMap := make(InternalResourceVersionMap) + + for k, v := range rvs { + internalMap[*location.NewLocation(k.Region, k.Partition)] = v + } + + return internalMap +} func (rvs *ResourceVersionMap) Copy() ResourceVersionMap { dupRVs := make(ResourceVersionMap, len(*rvs)) diff --git a/resource-management/pkg/common-lib/types/compositeresourceversion_test.go b/resource-management/pkg/common-lib/types/compositeresourceversion_test.go index 5953090b..d481a4a2 100644 --- a/resource-management/pkg/common-lib/types/compositeresourceversion_test.go +++ b/resource-management/pkg/common-lib/types/compositeresourceversion_test.go @@ -9,8 +9,8 @@ import ( func TestResourceVersionMap_Marshall_UnMarshall(t *testing.T) { rvs := make(ResourceVersionMap) - loc := location.NewLocation(location.Beijing, location.ResourcePartition1) - rvs[*loc] = 100 + loc := RvLocation{Region: location.Beijing, Partition: location.ResourcePartition1} + rvs[loc] = 100 // marshall b, err := json.Marshal(rvs) @@ -23,5 +23,5 @@ func TestResourceVersionMap_Marshall_UnMarshall(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, newRVMap) assert.Equal(t, 1, len(newRVMap)) - assert.Equal(t, uint64(100), newRVMap[*loc]) + assert.Equal(t, uint64(100), newRVMap[loc]) } diff --git a/resource-management/pkg/common-lib/types/location/location.go b/resource-management/pkg/common-lib/types/location/location.go index 56e5e6dc..41e05246 100644 --- a/resource-management/pkg/common-lib/types/location/location.go +++ b/resource-management/pkg/common-lib/types/location/location.go @@ -1,7 +1,6 @@ package location import ( - "encoding/json" "fmt" ) @@ -237,7 +236,7 @@ func GetRPNum() int { return len(ResourcePartitions) } -// TODO - read resource parition from configuration or metadata server +// TODO - read resource partition from configuration or metadata server func GetRPsForRegion(region Region) []ResourcePartition { rpsForRegion := make([]ResourcePartition, len(ResourcePartitions)) for i := 0; i < len(ResourcePartitions); i++ { @@ -266,13 +265,3 @@ func (loc *Location) Equal(locToCompare Location) bool { func (loc *Location) String() string { return fmt.Sprintf("[Region %s, ResoucePartition %s]", loc.region, loc.partition) } - -func (loc Location) MarshalText() (text []byte, err error) { - type l Location - return json.Marshal(l(loc)) -} - -func (loc *Location) UnmarshalText(text []byte) error { - type l Location - return json.Unmarshal(text, (*l)(loc)) -} diff --git a/resource-management/pkg/distributor/cache/eventqueue.go b/resource-management/pkg/distributor/cache/eventqueue.go index ba4b5ab3..06a28b57 100644 --- a/resource-management/pkg/distributor/cache/eventqueue.go +++ b/resource-management/pkg/distributor/cache/eventqueue.go @@ -140,9 +140,9 @@ func (eq *NodeEventQueue) EnqueueEvent(e *node.ManagedNodeEvent) { queueByLoc.enqueueEvent(e) } -func (eq *NodeEventQueue) Watch(rvs types.ResourceVersionMap, clientWatchChan chan *event.NodeEvent, stopCh chan struct{}) error { +func (eq *NodeEventQueue) Watch(rvs types.InternalResourceVersionMap, clientWatchChan chan *event.NodeEvent, stopCh chan struct{}) error { if eq.watchChan != nil { - return errors.New("Currently only support one watcher per node event queue.") + return errors.New("currently only support one watcher per node event queue") } // get events already in queues @@ -182,7 +182,7 @@ func (eq *NodeEventQueue) Watch(rvs types.ResourceVersionMap, clientWatchChan ch return nil } -func (eq *NodeEventQueue) getAllEventsSinceResourceVersion(rvs types.ResourceVersionMap) ([]*event.NodeEvent, error) { +func (eq *NodeEventQueue) getAllEventsSinceResourceVersion(rvs types.InternalResourceVersionMap) ([]*event.NodeEvent, error) { locStartPostitions := make(map[location.Location]int) for loc, rv := range rvs { diff --git a/resource-management/pkg/distributor/distributor.go b/resource-management/pkg/distributor/distributor.go index d34d5ec4..96fdab38 100644 --- a/resource-management/pkg/distributor/distributor.go +++ b/resource-management/pkg/distributor/distributor.go @@ -170,6 +170,8 @@ func (dis *ResourceDistributor) getSortedVirtualStores(stores map[*storage.Virtu return sortedStores } +// ListNodesForClient returns list of nodes for a client request and a RV map to the client for WATCH +// or error encountered during the node allocation/listing for the client func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.LogicalNode, types.ResourceVersionMap, error) { if clientId == "" { return nil, nil, errors.New("Empty clientId") @@ -239,7 +241,9 @@ func (dis *ResourceDistributor) Watch(clientId string, rvs types.ResourceVersion return errors.New("Stop watch channel not provided") } - return nodeEventQueue.Watch(rvs, watchChan, stopCh) + internal_rvs := types.ConvertToInternalResourceVersionMap(rvs) + + return nodeEventQueue.Watch(internal_rvs, watchChan, stopCh) } func (dis *ResourceDistributor) ProcessEvents(events []*event.NodeEvent) (bool, types.ResourceVersionMap) { diff --git a/resource-management/pkg/distributor/distributor_concurrency_test.go b/resource-management/pkg/distributor/distributor_concurrency_test.go index 376ea1bf..b05e388a 100644 --- a/resource-management/pkg/distributor/distributor_concurrency_test.go +++ b/resource-management/pkg/distributor/distributor_concurrency_test.go @@ -97,9 +97,9 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) { // check each node event nodeIds := make(map[string]bool) for _, node := range nodes { - nodeLoc := location.NewLocation(location.Region(node.GeoInfo.Region), location.ResourcePartition(node.GeoInfo.ResourcePartition)) + nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)} assert.NotNil(t, nodeLoc) - assert.True(t, latestRVs[*nodeLoc] >= node.GetResourceVersionInt64()) + assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64()) if _, isOK := nodeIds[node.Id]; isOK { assert.Fail(t, "List nodes cannot have more than one copy of a node") } else { @@ -282,9 +282,9 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) { // check each node event nodeIds := make(map[string]bool) for _, node := range nodes { - nodeLoc := location.NewLocation(location.Region(node.GeoInfo.Region), location.ResourcePartition(node.GeoInfo.ResourcePartition)) + nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)} assert.NotNil(t, nodeLoc) - assert.True(t, latestRVs[*nodeLoc] >= node.GetResourceVersionInt64()) + assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64()) if _, isOK := nodeIds[node.Id]; isOK { assert.Fail(t, "List nodes cannot have more than one copy of a node") } else { diff --git a/resource-management/pkg/distributor/distributor_test.go b/resource-management/pkg/distributor/distributor_test.go index 19e49821..72906e67 100644 --- a/resource-management/pkg/distributor/distributor_test.go +++ b/resource-management/pkg/distributor/distributor_test.go @@ -339,9 +339,9 @@ func TestRegistrationWorkflow(t *testing.T) { // check each node event nodeIds := make(map[string]bool) for _, node := range nodes { - nodeLoc := location.NewLocation(location.Region(node.GeoInfo.Region), location.ResourcePartition(node.GeoInfo.ResourcePartition)) + nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)} assert.NotNil(t, nodeLoc) - assert.True(t, latestRVs[*nodeLoc] >= node.GetResourceVersionInt64()) + assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64()) if _, isOK := nodeIds[node.Id]; isOK { assert.Fail(t, "List nodes cannot have more than one copy of a node") } else { @@ -355,8 +355,9 @@ func TestRegistrationWorkflow(t *testing.T) { updateNodeEvents := generatedUpdateNodeEventsFromNodeList(nodes) result2, rvMap2 := distributor.ProcessEvents(updateNodeEvents) assert.True(t, result2, "Expecting update nodes successfully") - loc := location.NewLocation(location.Region(nodes[0].GeoInfo.Region), location.ResourcePartition(nodes[0].GeoInfo.ResourcePartition)) - assert.Equal(t, uint64(rvToGenerate), rvMap2[*loc]) + loc := types.RvLocation{Region: location.Region(nodes[0].GeoInfo.Region), Partition: location.ResourcePartition (nodes[0].GeoInfo.ResourcePartition)} + + assert.Equal(t, uint64(rvToGenerate), rvMap2[loc]) assert.Equal(t, oldNodeRV, nodes[0].GetResourceVersionInt64(), "Expecting listed nodes are snapshoted and cannot be affected by update") // client watch node update @@ -370,7 +371,8 @@ func TestRegistrationWorkflow(t *testing.T) { watchedEventCount := 0 for e := range watchCh { assert.Equal(t, event.Modified, e.Type) - nodeLoc := location.NewLocation(location.Region(e.Node.GeoInfo.Region), location.ResourcePartition(e.Node.GeoInfo.ResourcePartition)) + nodeLoc := types.RvLocation{Region: location.Region(e.Node.GeoInfo.Region), Partition: location.ResourcePartition (e.Node.GeoInfo.ResourcePartition)} + assert.Equal(t, loc, nodeLoc) watchedEventCount++ diff --git a/resource-management/pkg/distributor/node/managedNodeEvent.go b/resource-management/pkg/distributor/node/managedNodeEvent.go index 3e9f7726..d27c2a9b 100644 --- a/resource-management/pkg/distributor/node/managedNodeEvent.go +++ b/resource-management/pkg/distributor/node/managedNodeEvent.go @@ -41,6 +41,11 @@ func (n *ManagedNodeEvent) GetLocation() *location.Location { return n.loc } +func (n *ManagedNodeEvent) GetRvLocation() *types.RvLocation { + + return &types.RvLocation{Region: n.loc.GetRegion(), Partition: n.loc.GetResourcePartition()} +} + func (n *ManagedNodeEvent) GetResourceVersion() uint64 { rv, err := strconv.ParseUint(n.nodeEvent.Node.ResourceVersion, 10, 64) if err != nil { diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index 8b586770..e2b22e07 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -67,6 +67,7 @@ func (vs *VirtualNodeStore) GetRange() (float64, float64) { return vs.lowerbound, vs.upperbound } +// Snapshot generates a list of node for the List() call from a client, and a current RV map to client func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.ResourceVersionMap) { vs.mu.RLock() defer vs.mu.RUnlock() @@ -76,12 +77,12 @@ func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.ResourceVers for _, node := range vs.nodeEventByHash { nodesCopy[index] = node.CopyNode() newRV := node.GetResourceVersion() - if lastRV, isOK := rvs[*node.GetLocation()]; isOK { + if lastRV, isOK := rvs[*node.GetRvLocation()]; isOK { if lastRV < newRV { - rvs[*node.GetLocation()] = newRV + rvs[*node.GetRvLocation()] = newRV } } else { - rvs[*node.GetLocation()] = newRV + rvs[*node.GetRvLocation()] = newRV } index++ } @@ -163,7 +164,7 @@ func (ns *NodeStore) GetCurrentResourceVersions() types.ResourceVersionMap { for i := 0; i < ns.regionNum; i++ { for j := 0; j < ns.partitionMaxNum; j++ { if ns.currentRVs[i][j] > 0 { - rvMap[*location.NewLocation(location.Regions[i], location.ResourcePartitions[j])] = ns.currentRVs[i][j] + rvMap[types.RvLocation{Region:location.Regions[i], Partition: location.ResourcePartitions[j]}] = ns.currentRVs[i][j] } } } diff --git a/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go b/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go index ed3a443b..9d140d4f 100644 --- a/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go +++ b/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go @@ -93,9 +93,9 @@ func GetRegionNodeModifiedEventsCRV(rvs types.ResourceVersionMap) (simulatorType for i := 0; i < NodesPerRP; i++ { region := snapshotNodeListEvents[j][i].Node.GeoInfo.Region rp := snapshotNodeListEvents[j][i].Node.GeoInfo.ResourcePartition - loc := location.NewLocation(location.Region(region), location.ResourcePartition(rp)) + loc := types.RvLocation{Region: location.Region(region), Partition: location.ResourcePartition(rp)} - if snapshotNodeListEvents[j][i].Node.GetResourceVersionInt64() > rvs[*loc] { + if snapshotNodeListEvents[j][i].Node.GetResourceVersionInt64() > rvs[loc] { count += 1 pulledNodeListEventsPerRP[indexPerRP] = snapshotNodeListEvents[j][i] indexPerRP += 1 From 1f881bbd6105cb1d4f26cd94cbc23a32ff80718c Mon Sep 17 00:00:00 2001 From: Yunwen Bai Date: Fri, 1 Jul 2022 01:03:04 -0700 Subject: [PATCH 2/4] RV map in transit and internal explictly named --- resource-management/pkg/aggregrator/aggregator.go | 10 +++++----- .../common-lib/interfaces/distributor/interfaces.go | 6 +++--- .../pkg/common-lib/interfaces/store/interface.go | 2 +- .../pkg/common-lib/types/compositeresourceversion.go | 8 ++++---- .../common-lib/types/compositeresourceversion_test.go | 4 ++-- resource-management/pkg/distributor/distributor.go | 10 +++++----- .../pkg/distributor/distributor_concurrency_test.go | 4 ++-- .../pkg/distributor/storage/nodestore.go | 10 +++++----- .../pkg/service-api/endpoints/installer.go | 4 ++-- resource-management/pkg/service-api/types/types.go | 4 ++-- resource-management/pkg/store/redis/redis_test.go | 2 +- .../data/regionNodeEvents.go | 2 +- .../test/resourceRegionMgrSimulator/types/types.go | 4 ++-- 13 files changed, 35 insertions(+), 35 deletions(-) diff --git a/resource-management/pkg/aggregrator/aggregator.go b/resource-management/pkg/aggregrator/aggregator.go index af9f21a8..b229bc43 100644 --- a/resource-management/pkg/aggregrator/aggregator.go +++ b/resource-management/pkg/aggregrator/aggregator.go @@ -32,7 +32,7 @@ type ClientOfRRM struct { // type ResponseFromRRM struct { RegionNodeEvents [][]*event.NodeEvent - RvMap types.ResourceVersionMap + RvMap types.TransitResourceVersionMap Length uint64 } @@ -41,7 +41,7 @@ type ResponseFromRRM struct { type PullDataFromRRM struct { BatchLength uint64 DefaultCRV uint64 - CRV types.ResourceVersionMap + CRV types.TransitResourceVersionMap } const ( @@ -76,7 +76,7 @@ func (a *Aggregator) Run() (err error) { klog.V(3).Infof("Existing goroutine for region: %v", a.urls[i]) }() - var crv types.ResourceVersionMap + var crv types.TransitResourceVersionMap var regionNodeEvents [][]*event.NodeEvent var length uint64 var eventProcess bool @@ -138,7 +138,7 @@ func (a *Aggregator) createClient(url string) *ClientOfRRM { // or // Call the resource region manager's SubsequentPull method {url}/resources/subsequentpull when crv is not nil // -func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64, crv types.ResourceVersionMap) ([][]*event.NodeEvent, uint64) { +func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64, crv types.TransitResourceVersionMap) ([][]*event.NodeEvent, uint64) { var path string if len(crv) == 0 { @@ -183,7 +183,7 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64 // Call resource region manager's POST method {url}/resources/crv to update the CRV // error indicate failed POST, CRV means Composite Resource Version // -func (a *Aggregator) postCRV(c *ClientOfRRM, crv types.ResourceVersionMap) error { +func (a *Aggregator) postCRV(c *ClientOfRRM, crv types.TransitResourceVersionMap) error { path := httpPrefix + c.BaseURL + "/resources/crv" bytes, _ := json.Marshal(PullDataFromRRM{CRV: crv.Copy()}) req, err := http.NewRequest(http.MethodPost, path, strings.NewReader((string(bytes)))) diff --git a/resource-management/pkg/common-lib/interfaces/distributor/interfaces.go b/resource-management/pkg/common-lib/interfaces/distributor/interfaces.go index 4698e48a..d0527404 100644 --- a/resource-management/pkg/common-lib/interfaces/distributor/interfaces.go +++ b/resource-management/pkg/common-lib/interfaces/distributor/interfaces.go @@ -8,7 +8,7 @@ import ( type Interface interface { RegisterClient(*types.Client) error - ListNodesForClient(clientId string) ([]*types.LogicalNode, types.ResourceVersionMap, error) - Watch(clientId string, rvs types.ResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error - ProcessEvents(events []*event.NodeEvent) (bool, types.ResourceVersionMap) + ListNodesForClient(clientId string) ([]*types.LogicalNode, types.TransitResourceVersionMap, error) + Watch(clientId string, rvs types.TransitResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error + ProcessEvents(events []*event.NodeEvent) (bool, types.TransitResourceVersionMap) } diff --git a/resource-management/pkg/common-lib/interfaces/store/interface.go b/resource-management/pkg/common-lib/interfaces/store/interface.go index 5b204f9c..ba55ea65 100644 --- a/resource-management/pkg/common-lib/interfaces/store/interface.go +++ b/resource-management/pkg/common-lib/interfaces/store/interface.go @@ -36,7 +36,7 @@ type NodeStoreStatus struct { VirtualNodeNumPerRP int // Latest resource version map - CurrentResourceVerions types.ResourceVersionMap + CurrentResourceVerions types.TransitResourceVersionMap } func (nsStatus *NodeStoreStatus) GetKey() string { diff --git a/resource-management/pkg/common-lib/types/compositeresourceversion.go b/resource-management/pkg/common-lib/types/compositeresourceversion.go index 4faf3157..b9acce4c 100644 --- a/resource-management/pkg/common-lib/types/compositeresourceversion.go +++ b/resource-management/pkg/common-lib/types/compositeresourceversion.go @@ -29,12 +29,12 @@ func (loc *RvLocation) UnmarshalText(text []byte) error { // Map from (regionId, ResourcePartitionId) to resourceVersion // used in REST API calls -type ResourceVersionMap map[RvLocation]uint64 +type TransitResourceVersionMap map[RvLocation]uint64 // internally used in the eventqueue used in WATCH of nodes type InternalResourceVersionMap map[location.Location]uint64 -func ConvertToInternalResourceVersionMap(rvs ResourceVersionMap) InternalResourceVersionMap { +func ConvertToInternalResourceVersionMap(rvs TransitResourceVersionMap) InternalResourceVersionMap { internalMap := make(InternalResourceVersionMap) for k, v := range rvs { @@ -44,8 +44,8 @@ func ConvertToInternalResourceVersionMap(rvs ResourceVersionMap) InternalResourc return internalMap } -func (rvs *ResourceVersionMap) Copy() ResourceVersionMap { - dupRVs := make(ResourceVersionMap, len(*rvs)) +func (rvs *TransitResourceVersionMap) Copy() TransitResourceVersionMap { + dupRVs := make(TransitResourceVersionMap, len(*rvs)) for loc, rv := range *rvs { dupRVs[loc] = rv } diff --git a/resource-management/pkg/common-lib/types/compositeresourceversion_test.go b/resource-management/pkg/common-lib/types/compositeresourceversion_test.go index d481a4a2..3b6d25cc 100644 --- a/resource-management/pkg/common-lib/types/compositeresourceversion_test.go +++ b/resource-management/pkg/common-lib/types/compositeresourceversion_test.go @@ -8,7 +8,7 @@ import ( ) func TestResourceVersionMap_Marshall_UnMarshall(t *testing.T) { - rvs := make(ResourceVersionMap) + rvs := make(TransitResourceVersionMap) loc := RvLocation{Region: location.Beijing, Partition: location.ResourcePartition1} rvs[loc] = 100 @@ -18,7 +18,7 @@ func TestResourceVersionMap_Marshall_UnMarshall(t *testing.T) { assert.NotNil(t, b) // unmarshall - var newRVMap ResourceVersionMap + var newRVMap TransitResourceVersionMap err = json.Unmarshal(b, &newRVMap) assert.Nil(t, err) assert.NotNil(t, newRVMap) diff --git a/resource-management/pkg/distributor/distributor.go b/resource-management/pkg/distributor/distributor.go index 96fdab38..32e82de0 100644 --- a/resource-management/pkg/distributor/distributor.go +++ b/resource-management/pkg/distributor/distributor.go @@ -172,7 +172,7 @@ func (dis *ResourceDistributor) getSortedVirtualStores(stores map[*storage.Virtu // ListNodesForClient returns list of nodes for a client request and a RV map to the client for WATCH // or error encountered during the node allocation/listing for the client -func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.LogicalNode, types.ResourceVersionMap, error) { +func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.LogicalNode, types.TransitResourceVersionMap, error) { if clientId == "" { return nil, nil, errors.New("Empty clientId") } @@ -189,7 +189,7 @@ func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.Lo eventQueue.AcquireSnapshotRLock() nodesByStore := make([][]*types.LogicalNode, len(assignedStores)) - rvMapByStore := make([]types.ResourceVersionMap, len(assignedStores)) + rvMapByStore := make([]types.TransitResourceVersionMap, len(assignedStores)) hostCount := 0 for i := 0; i < len(assignedStores); i++ { nodesByStore[i], rvMapByStore[i] = assignedStores[i].SnapShot() @@ -208,7 +208,7 @@ func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.Lo } // combine to single ResourceVersionMap - finalRVs := make(types.ResourceVersionMap) + finalRVs := make(types.TransitResourceVersionMap) for i := 0; i < len(rvMapByStore); i++ { currentRVs := rvMapByStore[i] for loc, rv := range currentRVs { @@ -225,7 +225,7 @@ func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.Lo return nodes, finalRVs, nil } -func (dis *ResourceDistributor) Watch(clientId string, rvs types.ResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error { +func (dis *ResourceDistributor) Watch(clientId string, rvs types.TransitResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error { var nodeEventQueue *cache.NodeEventQueue var isOK bool if nodeEventQueue, isOK = dis.nodeEventQueueMap[clientId]; !isOK || nodeEventQueue == nil { @@ -246,7 +246,7 @@ func (dis *ResourceDistributor) Watch(clientId string, rvs types.ResourceVersion return nodeEventQueue.Watch(internal_rvs, watchChan, stopCh) } -func (dis *ResourceDistributor) ProcessEvents(events []*event.NodeEvent) (bool, types.ResourceVersionMap) { +func (dis *ResourceDistributor) ProcessEvents(events []*event.NodeEvent) (bool, types.TransitResourceVersionMap) { eventsToProcess := make([]*node.ManagedNodeEvent, len(events)) for i := 0; i < len(events); i++ { if events[i] != nil { diff --git a/resource-management/pkg/distributor/distributor_concurrency_test.go b/resource-management/pkg/distributor/distributor_concurrency_test.go index b05e388a..c9a82d94 100644 --- a/resource-management/pkg/distributor/distributor_concurrency_test.go +++ b/resource-management/pkg/distributor/distributor_concurrency_test.go @@ -78,7 +78,7 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) { } // client list nodes - latestRVsByClient := make([]types.ResourceVersionMap, tt.clientNum) + latestRVsByClient := make([]types.TransitResourceVersionMap, tt.clientNum) nodesByClient := make([][]*types.LogicalNode, tt.clientNum) for i := 0; i < tt.clientNum; i++ { clientId := clientIds[i] @@ -262,7 +262,7 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) { duration += time.Since(start) // client list nodes - latestRVsByClient := make([]types.ResourceVersionMap, tt.clientNum) + latestRVsByClient := make([]types.TransitResourceVersionMap, tt.clientNum) nodesByClient := make([][]*types.LogicalNode, tt.clientNum) wg.Add(tt.clientNum) diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index e2b22e07..bdb48f6c 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -68,12 +68,12 @@ func (vs *VirtualNodeStore) GetRange() (float64, float64) { } // Snapshot generates a list of node for the List() call from a client, and a current RV map to client -func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.ResourceVersionMap) { +func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.TransitResourceVersionMap) { vs.mu.RLock() defer vs.mu.RUnlock() nodesCopy := make([]*types.LogicalNode, len(vs.nodeEventByHash)) index := 0 - rvs := make(types.ResourceVersionMap) + rvs := make(types.TransitResourceVersionMap) for _, node := range vs.nodeEventByHash { nodesCopy[index] = node.CopyNode() newRV := node.GetResourceVersion() @@ -157,10 +157,10 @@ func NewNodeStore(vNodeNumPerRP int, regionNum int, partitionMaxNum int) *NodeSt } // TODO - verify whether the original value can be changed. If so, return a deepcopy -func (ns *NodeStore) GetCurrentResourceVersions() types.ResourceVersionMap { +func (ns *NodeStore) GetCurrentResourceVersions() types.TransitResourceVersionMap { ns.rvLock.RLock() defer ns.rvLock.RUnlock() - rvMap := make(types.ResourceVersionMap) + rvMap := make(types.TransitResourceVersionMap) for i := 0; i < ns.regionNum; i++ { for j := 0; j < ns.partitionMaxNum; j++ { if ns.currentRVs[i][j] > 0 { @@ -246,7 +246,7 @@ func (ns *NodeStore) UpdateNode(nodeEvent *node.ManagedNodeEvent) { func (ns NodeStore) DeleteNode(nodeEvent event.NodeEvent) { } -func (ns *NodeStore) ProcessNodeEvents(nodeEvents []*node.ManagedNodeEvent, persistHelper *DistributorPersistHelper) (bool, types.ResourceVersionMap) { +func (ns *NodeStore) ProcessNodeEvents(nodeEvents []*node.ManagedNodeEvent, persistHelper *DistributorPersistHelper) (bool, types.TransitResourceVersionMap) { persistHelper.SetWaitCount(len(nodeEvents)) for _, e := range nodeEvents { if e == nil { diff --git a/resource-management/pkg/service-api/endpoints/installer.go b/resource-management/pkg/service-api/endpoints/installer.go index ff9b89e8..2fa30ef9 100644 --- a/resource-management/pkg/service-api/endpoints/installer.go +++ b/resource-management/pkg/service-api/endpoints/installer.go @@ -248,7 +248,7 @@ func getClinetId(req *http.Request) string { return clientId } -func getResourceVersionsMap(req *http.Request) (types.ResourceVersionMap, error) { +func getResourceVersionsMap(req *http.Request) (types.TransitResourceVersionMap, error) { body, err := ioutil.ReadAll(req.Body) if err != nil { @@ -267,7 +267,7 @@ func getResourceVersionsMap(req *http.Request) (types.ResourceVersionMap, error) return wr.ResourceVersions, nil } -func (i *Installer) handleResponseTrunked(resp http.ResponseWriter, nodes []*types.LogicalNode, crv types.ResourceVersionMap, chunkSize int) { +func (i *Installer) handleResponseTrunked(resp http.ResponseWriter, nodes []*types.LogicalNode, crv types.TransitResourceVersionMap, chunkSize int) { responseTrunkSize := DefaultResponseTrunkSize if responseTrunkSize < chunkSize { responseTrunkSize = chunkSize diff --git a/resource-management/pkg/service-api/types/types.go b/resource-management/pkg/service-api/types/types.go index 1db19d2a..2e37e6a9 100644 --- a/resource-management/pkg/service-api/types/types.go +++ b/resource-management/pkg/service-api/types/types.go @@ -5,7 +5,7 @@ import "global-resource-service/resource-management/pkg/common-lib/types" // WatchRequest is the request body of the Watch API call // ResourceVersionMap is part of the return of the LIST API call type WatchRequest struct { - ResourceVersions types.ResourceVersionMap `json:"resource_versions"` + ResourceVersions types.TransitResourceVersionMap `json:"resource_versions"` } // ClientRegistrationRequest is the request body when a client register to the resource management service @@ -28,5 +28,5 @@ type ClientRegistrationResponse struct { // ResourceVersions are the list of RVs from each RP type ListNodeResponse struct { NodeList []*types.LogicalNode `json:"node_list",omitempty` - ResourceVersions types.ResourceVersionMap `json:"resource_version_map,omitempty"` + ResourceVersions types.TransitResourceVersionMap `json:"resource_version_map,omitempty"` } diff --git a/resource-management/pkg/store/redis/redis_test.go b/resource-management/pkg/store/redis/redis_test.go index c39d3286..0def4a59 100644 --- a/resource-management/pkg/store/redis/redis_test.go +++ b/resource-management/pkg/store/redis/redis_test.go @@ -84,7 +84,7 @@ func TestPersistNodes(t *testing.T) { // Simply Test Persist Node Store Status // func TestPersistNodeStoreStatus(t *testing.T) { - var CRV = make(types.ResourceVersionMap, 1) + var CRV = make(types.TransitResourceVersionMap, 1) testLocation := location.NewLocation(location.Beijing, location.ResourcePartition1) CRV[*testLocation] = 1000 diff --git a/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go b/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go index 9d140d4f..0e883d49 100644 --- a/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go +++ b/resource-management/test/resourceRegionMgrSimulator/data/regionNodeEvents.go @@ -82,7 +82,7 @@ func GetRegionNodeAddedEvents(batchLength uint64) (simulatorTypes.RegionNodeEven // Return region node modified events with CRVs in BATCH LENGTH from all RPs // TO DO: paginate support // -func GetRegionNodeModifiedEventsCRV(rvs types.ResourceVersionMap) (simulatorTypes.RegionNodeEvents, uint64) { +func GetRegionNodeModifiedEventsCRV(rvs types.TransitResourceVersionMap) (simulatorTypes.RegionNodeEvents, uint64) { snapshotNodeListEvents = RegionNodeEventsList pulledNodeListEvents := make(simulatorTypes.RegionNodeEvents, RpNum) diff --git a/resource-management/test/resourceRegionMgrSimulator/types/types.go b/resource-management/test/resourceRegionMgrSimulator/types/types.go index 64087bf2..915b8754 100644 --- a/resource-management/test/resourceRegionMgrSimulator/types/types.go +++ b/resource-management/test/resourceRegionMgrSimulator/types/types.go @@ -16,7 +16,7 @@ type PostCRVstatus bool // type ResponseFromRRM struct { RegionNodeEvents [][]*event.NodeEvent - RvMap types.ResourceVersionMap + RvMap types.TransitResourceVersionMap Length uint64 } @@ -25,7 +25,7 @@ type ResponseFromRRM struct { type PullDataFromRRM struct { BatchLength uint64 DefaultCRV uint64 - CRV types.ResourceVersionMap + CRV types.TransitResourceVersionMap } // ToJSON serializes the contents of the collection to JSON From f31cd8c16fbc4edff60996e90072afd7be71e28b Mon Sep 17 00:00:00 2001 From: Yunwen Bai Date: Fri, 1 Jul 2022 02:02:56 -0700 Subject: [PATCH 3/4] update go mod --- go.mod | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/go.mod b/go.mod index 814638db..92e72e16 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,13 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-logr/logr v1.2.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect From 4bdb1db62dc43c3c0c3ed0e6091e5c09bffe1134 Mon Sep 17 00:00:00 2001 From: Yunwen Bai Date: Fri, 1 Jul 2022 11:32:23 -0700 Subject: [PATCH 4/4] minor fix --- resource-management/pkg/distributor/storage/nodestore.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index bdb48f6c..a605a5e8 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -77,12 +77,13 @@ func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.TransitResou for _, node := range vs.nodeEventByHash { nodesCopy[index] = node.CopyNode() newRV := node.GetResourceVersion() - if lastRV, isOK := rvs[*node.GetRvLocation()]; isOK { + rvLoc := *node.GetRvLocation() + if lastRV, isOK := rvs[rvLoc]; isOK { if lastRV < newRV { - rvs[*node.GetRvLocation()] = newRV + rvs[rvLoc] = newRV } } else { - rvs[*node.GetRvLocation()] = newRV + rvs[rvLoc] = newRV } index++ } @@ -164,7 +165,7 @@ func (ns *NodeStore) GetCurrentResourceVersions() types.TransitResourceVersionMa for i := 0; i < ns.regionNum; i++ { for j := 0; j < ns.partitionMaxNum; j++ { if ns.currentRVs[i][j] > 0 { - rvMap[types.RvLocation{Region:location.Regions[i], Partition: location.ResourcePartitions[j]}] = ns.currentRVs[i][j] + rvMap[types.RvLocation{Region: location.Regions[i], Partition: location.ResourcePartitions[j]}] = ns.currentRVs[i][j] } } }