Skip to content

Commit

Permalink
linear: one modified, respond all requested resources in streamState
Browse files Browse the repository at this point in the history
  • Loading branch information
fscnick committed Mar 4, 2022
1 parent 83bd3f2 commit f2de42d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 48 deletions.
70 changes: 23 additions & 47 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,29 +143,27 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string)
}
}

func (cache *LinearCache) notifyAll(modified map[string]struct{}, fromDeletion bool) {
func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
// de-duplicate watches that need to be responded
notifyList := make(map[chan Response][]string)
if !fromDeletion {
for name := range modified {
for watch := range cache.watches[name] {
notifyList[watch] = append(notifyList[watch], name)
}
delete(cache.watches, name)
}
} else {
for deletedName := range modified {
for watch, streamState := range cache.watches[deletedName] {
resourceNames := streamState.GetKnownResourceNames(cache.typeURL)
for resourceName := range resourceNames {
// To avoid the stale in notifyList becomes empty slice.
// Don't skip resource name that has been deleted here.
// It would be filtered out in respond because the corresponding resource has been deleted.
notifyList[watch] = append(notifyList[watch], resourceName)
for name := range modified {
for watch, streamState := range cache.watches[name] {
resourceNames := streamState.GetKnownResourceNames(cache.typeURL)
modifiedNameInResourceName := false
for resourceName := range resourceNames {
if !modifiedNameInResourceName && resourceName == name {
modifiedNameInResourceName = true
}
// To avoid the stale in notifyList becomes empty slice.
// Don't skip resource name that has been deleted here.
// It would be filtered out in respond because the corresponding resource has been deleted.
notifyList[watch] = append(notifyList[watch], resourceName)
}
if !modifiedNameInResourceName {
notifyList[watch] = append(notifyList[watch], name)
}
delete(cache.watches, deletedName)
}
delete(cache.watches, name)
}

for value, stale := range notifyList {
Expand All @@ -189,31 +187,6 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}, fromDeletion b
}
}

func (cache *LinearCache) notifyAllFromDeletion(modified map[string]struct{}) {
notifyList := make(map[chan Response][]string)
for deletedName := range modified {
for watch, streamState := range cache.watches[deletedName] {
names := streamState.GetKnownResourceNames(cache.typeURL)
for name := range names {
if name == deletedName {
// skip the resource name has been deleted.
continue
}
notifyList[watch] = append(notifyList[watch], name)
}
}
delete(cache.watches, deletedName)
}

for value, stale := range notifyList {
cache.respond(value, stale)
}

for value := range cache.watchAll {
cache.respond(value, nil)
}
}

func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse {
resp := createDeltaResponse(context.Background(), request, state, resourceContainer{
resourceMap: cache.resources,
Expand Down Expand Up @@ -246,7 +219,7 @@ func (cache *LinearCache) UpdateResource(name string, res types.Resource) error
cache.resources[name] = res

// TODO: batch watch closures to prevent rapid updates
cache.notifyAll(map[string]struct{}{name: {}}, false)
cache.notifyAll(map[string]struct{}{name: {}})

return nil
}
Expand All @@ -261,7 +234,7 @@ func (cache *LinearCache) DeleteResource(name string) error {
delete(cache.resources, name)

// TODO: batch watch closures to prevent rapid updates
cache.notifyAll(map[string]struct{}{name: {}}, true)
cache.notifyAll(map[string]struct{}{name: {}})
return nil
}

Expand Down Expand Up @@ -293,7 +266,7 @@ func (cache *LinearCache) SetResources(resources map[string]types.Resource) {
modified[name] = struct{}{}
}

cache.notifyAll(modified, false)
cache.notifyAll(modified)
}

// GetResources returns current resources stored in the cache
Expand Down Expand Up @@ -346,7 +319,10 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
// When a resource is removed, its version defaults 0 and it is not considered stale.
if lastVersion < version || (!has && exists) {
stale = true
staleResources = append(staleResources, name)

// Here we collect all requested names.
// It would be filtered out in respond if the resource name doesn't appear in cache.
staleResources = request.ResourceNames
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func TestLinearWatchTwo(t *testing.T) {
mustBlock(t, w1)
require.NoError(t, c.UpdateResource("a", testResource("aa")))
// should only get the modified resource
verifyResponse(t, w, "1", 1)
verifyResponse(t, w, "1", 2)
verifyResponse(t, w1, "1", 2)
}

Expand Down

0 comments on commit f2de42d

Please sign in to comment.