From 54bb6cd1697eedf791a2879275dfe84b44b099db Mon Sep 17 00:00:00 2001 From: Fu-Sheng Date: Sun, 4 Sep 2022 16:48:24 +0800 Subject: [PATCH] linear: fix test cases Signed-off-by: Fu-Sheng --- pkg/cache/v3/linear_test.go | 5 +++-- pkg/server/stream/v3/stream.go | 12 ++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index cd822fb0a2..a7656fac50 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -742,6 +742,7 @@ func TestLinearMixedWatches(t *testing.T) { assert.Equal(t, 2, c.NumResources()) sotwState := stream.NewStreamState(false, nil) + sotwState.SetKnownResourceNamesAsList(testType, []string{"a", "b"}) w := make(chan Response, 1) c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) mustBlock(t, w) @@ -754,7 +755,7 @@ func TestLinearMixedWatches(t *testing.T) { err = c.UpdateResources(map[string]types.Resource{"a": a}, nil) assert.NoError(t, err) // This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation - verifyResponse(t, w, c.getVersion(), 1) + verifyResponse(t, w, c.getVersion(), 2) checkVersionMapNotSet(t, c) c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) @@ -775,6 +776,6 @@ func TestLinearMixedWatches(t *testing.T) { assert.NoError(t, err) checkVersionMapSet(t, c) - verifyResponse(t, w, c.getVersion(), 0) + verifyResponse(t, w, c.getVersion(), 1) verifyDeltaResponse(t, wd, nil, []string{"b"}) } diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 093dd46d7d..a73083c7d5 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -41,6 +41,8 @@ type StreamState struct { // nolint:golint,revive // indicates whether the object has been modified since its creation first bool + + mu *sync.RWMutex } // GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to @@ -95,10 +97,16 @@ func (s *StreamState) IsWildcard() bool { } func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { + s.mu.Lock() + defer s.mu.Unlock() + s.knownResourceNames[url] = names } func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { + s.mu.Lock() + defer s.mu.Unlock() + m := map[string]struct{}{} for _, name := range names { m[name] = struct{}{} @@ -107,6 +115,9 @@ func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { } func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { + s.mu.Lock() + defer s.mu.Unlock() + return s.knownResourceNames[url] } @@ -118,6 +129,7 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St resourceVersions: initialResourceVersions, first: true, knownResourceNames: map[string]map[string]struct{}{}, + mu: &sync.RWMutex{}, } if initialResourceVersions == nil {