Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Location structure for transit and internal ( i.e. RV maps or the distributor internal logic ) #67

Merged
merged 4 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions resource-management/pkg/aggregrator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type ClientOfRRM struct {
//
type ResponseFromRRM struct {
RegionNodeEvents [][]*event.NodeEvent
RvMap types.ResourceVersionMap
RvMap types.TransitResourceVersionMap
Length uint64
}

Expand All @@ -41,7 +41,7 @@ type ResponseFromRRM struct {
type PullDataFromRRM struct {
BatchLength uint64
DefaultCRV uint64
CRV types.ResourceVersionMap
CRV types.TransitResourceVersionMap
}

const (
Expand Down Expand Up @@ -76,7 +76,7 @@ func (a *Aggregator) Run() (err error) {
klog.V(3).Infof("Existing goroutine for region: %v", a.urls[i])
}()

var crv types.ResourceVersionMap
var crv types.TransitResourceVersionMap
var regionNodeEvents [][]*event.NodeEvent
var length uint64
var eventProcess bool
Expand Down Expand Up @@ -138,7 +138,7 @@ func (a *Aggregator) createClient(url string) *ClientOfRRM {
// or
// Call the resource region manager's SubsequentPull method {url}/resources/subsequentpull when crv is not nil
//
func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64, crv types.ResourceVersionMap) ([][]*event.NodeEvent, uint64) {
func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64, crv types.TransitResourceVersionMap) ([][]*event.NodeEvent, uint64) {
var path string

if len(crv) == 0 {
Expand Down Expand Up @@ -183,7 +183,7 @@ func (a *Aggregator) initPullOrSubsequentPull(c *ClientOfRRM, batchLength uint64
// Call resource region manager's POST method {url}/resources/crv to update the CRV
// error indicate failed POST, CRV means Composite Resource Version
//
func (a *Aggregator) postCRV(c *ClientOfRRM, crv types.ResourceVersionMap) error {
func (a *Aggregator) postCRV(c *ClientOfRRM, crv types.TransitResourceVersionMap) error {
path := httpPrefix + c.BaseURL + "/resources/crv"
bytes, _ := json.Marshal(PullDataFromRRM{CRV: crv.Copy()})
req, err := http.NewRequest(http.MethodPost, path, strings.NewReader((string(bytes))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type Interface interface {
RegisterClient(*types.Client) error

ListNodesForClient(clientId string) ([]*types.LogicalNode, types.ResourceVersionMap, error)
Watch(clientId string, rvs types.ResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error
ProcessEvents(events []*event.NodeEvent) (bool, types.ResourceVersionMap)
ListNodesForClient(clientId string) ([]*types.LogicalNode, types.TransitResourceVersionMap, error)
Watch(clientId string, rvs types.TransitResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error
ProcessEvents(events []*event.NodeEvent) (bool, types.TransitResourceVersionMap)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type NodeStoreStatus struct {
VirtualNodeNumPerRP int

// Latest resource version map
CurrentResourceVerions types.ResourceVersionMap
CurrentResourceVerions types.TransitResourceVersionMap
}

func (nsStatus *NodeStoreStatus) GetKey() string {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package types

import (
"encoding/json"

"global-resource-service/resource-management/pkg/common-lib/types/location"
)

Expand All @@ -10,11 +12,40 @@ type CompositeResourceVersion struct {
ResourceVersion uint64
}

type RvLocation struct {
Region location.Region
Partition location.ResourcePartition
}

func (loc RvLocation) MarshalText() (text []byte, err error) {
type l RvLocation
return json.Marshal(l(loc))
}

func (loc *RvLocation) UnmarshalText(text []byte) error {
type l RvLocation
return json.Unmarshal(text, (*l)(loc))
}

// Map from (regionId, ResourcePartitionId) to resourceVersion
type ResourceVersionMap map[location.Location]uint64
// used in REST API calls
type TransitResourceVersionMap map[RvLocation]uint64
yb01 marked this conversation as resolved.
Show resolved Hide resolved

// internally used in the eventqueue used in WATCH of nodes
type InternalResourceVersionMap map[location.Location]uint64

func ConvertToInternalResourceVersionMap(rvs TransitResourceVersionMap) InternalResourceVersionMap {
internalMap := make(InternalResourceVersionMap)

for k, v := range rvs {
internalMap[*location.NewLocation(k.Region, k.Partition)] = v
}

return internalMap
}

func (rvs *ResourceVersionMap) Copy() ResourceVersionMap {
dupRVs := make(ResourceVersionMap, len(*rvs))
func (rvs *TransitResourceVersionMap) Copy() TransitResourceVersionMap {
dupRVs := make(TransitResourceVersionMap, len(*rvs))
for loc, rv := range *rvs {
dupRVs[loc] = rv
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ import (
)

func TestResourceVersionMap_Marshall_UnMarshall(t *testing.T) {
rvs := make(ResourceVersionMap)
loc := location.NewLocation(location.Beijing, location.ResourcePartition1)
rvs[*loc] = 100
rvs := make(TransitResourceVersionMap)
loc := RvLocation{Region: location.Beijing, Partition: location.ResourcePartition1}
rvs[loc] = 100

// marshall
b, err := json.Marshal(rvs)
assert.Nil(t, err)
assert.NotNil(t, b)

// unmarshall
var newRVMap ResourceVersionMap
var newRVMap TransitResourceVersionMap
err = json.Unmarshal(b, &newRVMap)
assert.Nil(t, err)
assert.NotNil(t, newRVMap)
assert.Equal(t, 1, len(newRVMap))
assert.Equal(t, uint64(100), newRVMap[*loc])
assert.Equal(t, uint64(100), newRVMap[loc])
}
13 changes: 1 addition & 12 deletions resource-management/pkg/common-lib/types/location/location.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package location

import (
"encoding/json"
"fmt"
)

Expand Down Expand Up @@ -237,7 +236,7 @@ func GetRPNum() int {
return len(ResourcePartitions)
}

// TODO - read resource parition from configuration or metadata server
// TODO - read resource partition from configuration or metadata server
func GetRPsForRegion(region Region) []ResourcePartition {
rpsForRegion := make([]ResourcePartition, len(ResourcePartitions))
for i := 0; i < len(ResourcePartitions); i++ {
Expand Down Expand Up @@ -266,13 +265,3 @@ func (loc *Location) Equal(locToCompare Location) bool {
func (loc *Location) String() string {
return fmt.Sprintf("[Region %s, ResoucePartition %s]", loc.region, loc.partition)
}

func (loc Location) MarshalText() (text []byte, err error) {
type l Location
return json.Marshal(l(loc))
}

func (loc *Location) UnmarshalText(text []byte) error {
type l Location
return json.Unmarshal(text, (*l)(loc))
}
6 changes: 3 additions & 3 deletions resource-management/pkg/distributor/cache/eventqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ func (eq *NodeEventQueue) EnqueueEvent(e *node.ManagedNodeEvent) {
queueByLoc.enqueueEvent(e)
}

func (eq *NodeEventQueue) Watch(rvs types.ResourceVersionMap, clientWatchChan chan *event.NodeEvent, stopCh chan struct{}) error {
func (eq *NodeEventQueue) Watch(rvs types.InternalResourceVersionMap, clientWatchChan chan *event.NodeEvent, stopCh chan struct{}) error {
if eq.watchChan != nil {
return errors.New("Currently only support one watcher per node event queue.")
return errors.New("currently only support one watcher per node event queue")
}

// get events already in queues
Expand Down Expand Up @@ -182,7 +182,7 @@ func (eq *NodeEventQueue) Watch(rvs types.ResourceVersionMap, clientWatchChan ch
return nil
}

func (eq *NodeEventQueue) getAllEventsSinceResourceVersion(rvs types.ResourceVersionMap) ([]*event.NodeEvent, error) {
func (eq *NodeEventQueue) getAllEventsSinceResourceVersion(rvs types.InternalResourceVersionMap) ([]*event.NodeEvent, error) {
locStartPostitions := make(map[location.Location]int)

for loc, rv := range rvs {
Expand Down
16 changes: 10 additions & 6 deletions resource-management/pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ func (dis *ResourceDistributor) getSortedVirtualStores(stores map[*storage.Virtu
return sortedStores
}

func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.LogicalNode, types.ResourceVersionMap, error) {
// ListNodesForClient returns list of nodes for a client request and a RV map to the client for WATCH
// or error encountered during the node allocation/listing for the client
func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.LogicalNode, types.TransitResourceVersionMap, error) {
if clientId == "" {
return nil, nil, errors.New("Empty clientId")
}
Expand All @@ -187,7 +189,7 @@ func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.Lo

eventQueue.AcquireSnapshotRLock()
nodesByStore := make([][]*types.LogicalNode, len(assignedStores))
rvMapByStore := make([]types.ResourceVersionMap, len(assignedStores))
rvMapByStore := make([]types.TransitResourceVersionMap, len(assignedStores))
hostCount := 0
for i := 0; i < len(assignedStores); i++ {
nodesByStore[i], rvMapByStore[i] = assignedStores[i].SnapShot()
Expand All @@ -206,7 +208,7 @@ func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.Lo
}

// combine to single ResourceVersionMap
finalRVs := make(types.ResourceVersionMap)
finalRVs := make(types.TransitResourceVersionMap)
for i := 0; i < len(rvMapByStore); i++ {
currentRVs := rvMapByStore[i]
for loc, rv := range currentRVs {
Expand All @@ -223,7 +225,7 @@ func (dis *ResourceDistributor) ListNodesForClient(clientId string) ([]*types.Lo
return nodes, finalRVs, nil
}

func (dis *ResourceDistributor) Watch(clientId string, rvs types.ResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error {
func (dis *ResourceDistributor) Watch(clientId string, rvs types.TransitResourceVersionMap, watchChan chan *event.NodeEvent, stopCh chan struct{}) error {
var nodeEventQueue *cache.NodeEventQueue
var isOK bool
if nodeEventQueue, isOK = dis.nodeEventQueueMap[clientId]; !isOK || nodeEventQueue == nil {
Expand All @@ -239,10 +241,12 @@ func (dis *ResourceDistributor) Watch(clientId string, rvs types.ResourceVersion
return errors.New("Stop watch channel not provided")
}

return nodeEventQueue.Watch(rvs, watchChan, stopCh)
internal_rvs := types.ConvertToInternalResourceVersionMap(rvs)

return nodeEventQueue.Watch(internal_rvs, watchChan, stopCh)
}

func (dis *ResourceDistributor) ProcessEvents(events []*event.NodeEvent) (bool, types.ResourceVersionMap) {
func (dis *ResourceDistributor) ProcessEvents(events []*event.NodeEvent) (bool, types.TransitResourceVersionMap) {
eventsToProcess := make([]*node.ManagedNodeEvent, len(events))
for i := 0; i < len(events); i++ {
if events[i] != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) {
}

// client list nodes
latestRVsByClient := make([]types.ResourceVersionMap, tt.clientNum)
latestRVsByClient := make([]types.TransitResourceVersionMap, tt.clientNum)
nodesByClient := make([][]*types.LogicalNode, tt.clientNum)
for i := 0; i < tt.clientNum; i++ {
clientId := clientIds[i]
Expand All @@ -97,9 +97,9 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) {
// check each node event
nodeIds := make(map[string]bool)
for _, node := range nodes {
nodeLoc := location.NewLocation(location.Region(node.GeoInfo.Region), location.ResourcePartition(node.GeoInfo.ResourcePartition))
nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)}
assert.NotNil(t, nodeLoc)
assert.True(t, latestRVs[*nodeLoc] >= node.GetResourceVersionInt64())
assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64())
if _, isOK := nodeIds[node.Id]; isOK {
assert.Fail(t, "List nodes cannot have more than one copy of a node")
} else {
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) {
duration += time.Since(start)

// client list nodes
latestRVsByClient := make([]types.ResourceVersionMap, tt.clientNum)
latestRVsByClient := make([]types.TransitResourceVersionMap, tt.clientNum)
nodesByClient := make([][]*types.LogicalNode, tt.clientNum)
wg.Add(tt.clientNum)

Expand All @@ -282,9 +282,9 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) {
// check each node event
nodeIds := make(map[string]bool)
for _, node := range nodes {
nodeLoc := location.NewLocation(location.Region(node.GeoInfo.Region), location.ResourcePartition(node.GeoInfo.ResourcePartition))
nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)}
assert.NotNil(t, nodeLoc)
assert.True(t, latestRVs[*nodeLoc] >= node.GetResourceVersionInt64())
assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64())
if _, isOK := nodeIds[node.Id]; isOK {
assert.Fail(t, "List nodes cannot have more than one copy of a node")
} else {
Expand Down
12 changes: 7 additions & 5 deletions resource-management/pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,9 @@ func TestRegistrationWorkflow(t *testing.T) {
// check each node event
nodeIds := make(map[string]bool)
for _, node := range nodes {
nodeLoc := location.NewLocation(location.Region(node.GeoInfo.Region), location.ResourcePartition(node.GeoInfo.ResourcePartition))
nodeLoc := types.RvLocation{Region: location.Region(node.GeoInfo.Region), Partition: location.ResourcePartition (node.GeoInfo.ResourcePartition)}
assert.NotNil(t, nodeLoc)
assert.True(t, latestRVs[*nodeLoc] >= node.GetResourceVersionInt64())
assert.True(t, latestRVs[nodeLoc] >= node.GetResourceVersionInt64())
if _, isOK := nodeIds[node.Id]; isOK {
assert.Fail(t, "List nodes cannot have more than one copy of a node")
} else {
Expand All @@ -355,8 +355,9 @@ func TestRegistrationWorkflow(t *testing.T) {
updateNodeEvents := generatedUpdateNodeEventsFromNodeList(nodes)
result2, rvMap2 := distributor.ProcessEvents(updateNodeEvents)
assert.True(t, result2, "Expecting update nodes successfully")
loc := location.NewLocation(location.Region(nodes[0].GeoInfo.Region), location.ResourcePartition(nodes[0].GeoInfo.ResourcePartition))
assert.Equal(t, uint64(rvToGenerate), rvMap2[*loc])
loc := types.RvLocation{Region: location.Region(nodes[0].GeoInfo.Region), Partition: location.ResourcePartition (nodes[0].GeoInfo.ResourcePartition)}

assert.Equal(t, uint64(rvToGenerate), rvMap2[loc])
assert.Equal(t, oldNodeRV, nodes[0].GetResourceVersionInt64(), "Expecting listed nodes are snapshoted and cannot be affected by update")

// client watch node update
Expand All @@ -370,7 +371,8 @@ func TestRegistrationWorkflow(t *testing.T) {
watchedEventCount := 0
for e := range watchCh {
assert.Equal(t, event.Modified, e.Type)
nodeLoc := location.NewLocation(location.Region(e.Node.GeoInfo.Region), location.ResourcePartition(e.Node.GeoInfo.ResourcePartition))
nodeLoc := types.RvLocation{Region: location.Region(e.Node.GeoInfo.Region), Partition: location.ResourcePartition (e.Node.GeoInfo.ResourcePartition)}

assert.Equal(t, loc, nodeLoc)
watchedEventCount++

Expand Down
5 changes: 5 additions & 0 deletions resource-management/pkg/distributor/node/managedNodeEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (n *ManagedNodeEvent) GetLocation() *location.Location {
return n.loc
}

func (n *ManagedNodeEvent) GetRvLocation() *types.RvLocation {

return &types.RvLocation{Region: n.loc.GetRegion(), Partition: n.loc.GetResourcePartition()}
}

func (n *ManagedNodeEvent) GetResourceVersion() uint64 {
rv, err := strconv.ParseUint(n.nodeEvent.Node.ResourceVersion, 10, 64)
if err != nil {
Expand Down
20 changes: 11 additions & 9 deletions resource-management/pkg/distributor/storage/nodestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,23 @@ func (vs *VirtualNodeStore) GetRange() (float64, float64) {
return vs.lowerbound, vs.upperbound
}

func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.ResourceVersionMap) {
// Snapshot generates a list of node for the List() call from a client, and a current RV map to client
func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.TransitResourceVersionMap) {
vs.mu.RLock()
defer vs.mu.RUnlock()
nodesCopy := make([]*types.LogicalNode, len(vs.nodeEventByHash))
index := 0
rvs := make(types.ResourceVersionMap)
rvs := make(types.TransitResourceVersionMap)
for _, node := range vs.nodeEventByHash {
nodesCopy[index] = node.CopyNode()
newRV := node.GetResourceVersion()
if lastRV, isOK := rvs[*node.GetLocation()]; isOK {
rvLoc := *node.GetRvLocation()
if lastRV, isOK := rvs[rvLoc]; isOK {
if lastRV < newRV {
rvs[*node.GetLocation()] = newRV
rvs[rvLoc] = newRV
}
} else {
rvs[*node.GetLocation()] = newRV
rvs[rvLoc] = newRV
}
index++
}
Expand Down Expand Up @@ -156,14 +158,14 @@ func NewNodeStore(vNodeNumPerRP int, regionNum int, partitionMaxNum int) *NodeSt
}

// TODO - verify whether the original value can be changed. If so, return a deepcopy
func (ns *NodeStore) GetCurrentResourceVersions() types.ResourceVersionMap {
func (ns *NodeStore) GetCurrentResourceVersions() types.TransitResourceVersionMap {
ns.rvLock.RLock()
defer ns.rvLock.RUnlock()
rvMap := make(types.ResourceVersionMap)
rvMap := make(types.TransitResourceVersionMap)
for i := 0; i < ns.regionNum; i++ {
for j := 0; j < ns.partitionMaxNum; j++ {
if ns.currentRVs[i][j] > 0 {
rvMap[*location.NewLocation(location.Regions[i], location.ResourcePartitions[j])] = ns.currentRVs[i][j]
rvMap[types.RvLocation{Region: location.Regions[i], Partition: location.ResourcePartitions[j]}] = ns.currentRVs[i][j]
}
}
}
Expand Down Expand Up @@ -245,7 +247,7 @@ func (ns *NodeStore) UpdateNode(nodeEvent *node.ManagedNodeEvent) {
func (ns NodeStore) DeleteNode(nodeEvent event.NodeEvent) {
}

func (ns *NodeStore) ProcessNodeEvents(nodeEvents []*node.ManagedNodeEvent, persistHelper *DistributorPersistHelper) (bool, types.ResourceVersionMap) {
func (ns *NodeStore) ProcessNodeEvents(nodeEvents []*node.ManagedNodeEvent, persistHelper *DistributorPersistHelper) (bool, types.TransitResourceVersionMap) {
persistHelper.SetWaitCount(len(nodeEvents))
for _, e := range nodeEvents {
if e == nil {
Expand Down
Loading