Skip to content

Commit

Permalink
linear: notifyAll could distinguish from deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
fscnick committed Mar 4, 2022
1 parent dbfed69 commit 83bd3f2
Showing 1 changed file with 52 additions and 11 deletions.
63 changes: 52 additions & 11 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

type watches = map[chan Response]struct{}
type watches = map[chan Response]stream.StreamState

// LinearCache supports collections of opaque resources. This cache has a
// single collection indexed by resource names and manages resource versions
Expand Down Expand Up @@ -143,15 +143,31 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string)
}
}

func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
func (cache *LinearCache) notifyAll(modified map[string]struct{}, fromDeletion bool) {
// de-duplicate watches that need to be responded
notifyList := make(map[chan Response][]string)
for name := range modified {
for watch := range cache.watches[name] {
notifyList[watch] = append(notifyList[watch], name)
if !fromDeletion {
for name := range modified {
for watch := range cache.watches[name] {
notifyList[watch] = append(notifyList[watch], name)
}
delete(cache.watches, name)
}
} else {
for deletedName := range modified {
for watch, streamState := range cache.watches[deletedName] {
resourceNames := streamState.GetKnownResourceNames(cache.typeURL)
for resourceName := range resourceNames {
// To avoid the stale in notifyList becomes empty slice.
// Don't skip resource name that has been deleted here.
// It would be filtered out in respond because the corresponding resource has been deleted.
notifyList[watch] = append(notifyList[watch], resourceName)
}
}
delete(cache.watches, deletedName)
}
delete(cache.watches, name)
}

for value, stale := range notifyList {
cache.respond(value, stale)
}
Expand All @@ -173,6 +189,31 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
}
}

func (cache *LinearCache) notifyAllFromDeletion(modified map[string]struct{}) {
notifyList := make(map[chan Response][]string)
for deletedName := range modified {
for watch, streamState := range cache.watches[deletedName] {
names := streamState.GetKnownResourceNames(cache.typeURL)
for name := range names {
if name == deletedName {
// skip the resource name has been deleted.
continue
}
notifyList[watch] = append(notifyList[watch], name)
}
}
delete(cache.watches, deletedName)
}

for value, stale := range notifyList {
cache.respond(value, stale)
}

for value := range cache.watchAll {
cache.respond(value, nil)
}
}

func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse {
resp := createDeltaResponse(context.Background(), request, state, resourceContainer{
resourceMap: cache.resources,
Expand Down Expand Up @@ -205,7 +246,7 @@ func (cache *LinearCache) UpdateResource(name string, res types.Resource) error
cache.resources[name] = res

// TODO: batch watch closures to prevent rapid updates
cache.notifyAll(map[string]struct{}{name: {}})
cache.notifyAll(map[string]struct{}{name: {}}, false)

return nil
}
Expand All @@ -220,7 +261,7 @@ func (cache *LinearCache) DeleteResource(name string) error {
delete(cache.resources, name)

// TODO: batch watch closures to prevent rapid updates
cache.notifyAll(map[string]struct{}{name: {}})
cache.notifyAll(map[string]struct{}{name: {}}, true)
return nil
}

Expand Down Expand Up @@ -252,7 +293,7 @@ func (cache *LinearCache) SetResources(resources map[string]types.Resource) {
modified[name] = struct{}{}
}

cache.notifyAll(modified)
cache.notifyAll(modified, false)
}

// GetResources returns current resources stored in the cache
Expand Down Expand Up @@ -315,7 +356,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
}
// Create open watches since versions are up to date.
if len(request.ResourceNames) == 0 {
cache.watchAll[value] = struct{}{}
cache.watchAll[value] = streamState
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
Expand All @@ -328,7 +369,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
set = make(watches)
cache.watches[name] = set
}
set[value] = struct{}{}
set[value] = streamState
}
return func() {
cache.mu.Lock()
Expand Down

0 comments on commit 83bd3f2

Please sign in to comment.