Skip to content

Commit

Permalink
fix(kds): fix retry on NACK and add backoff (#9736)
Browse files Browse the repository at this point in the history
Signed-off-by: slonka <[email protected]>
Signed-off-by: Jakub Dyszkiewicz <[email protected]>
Co-authored-by: Jakub Dyszkiewicz <[email protected]>
  • Loading branch information
2 people authored and kumahq[bot] committed Apr 8, 2024
1 parent 811da17 commit 0dc9faf
Show file tree
Hide file tree
Showing 10 changed files with 566 additions and 37 deletions.
5 changes: 5 additions & 0 deletions pkg/events/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type ResourceChangedEvent struct {
Key model.ResourceKey
}

type TriggerKDSResyncEvent struct {
Type model.ResourceType
NodeID string
}

var ListenerStoppedErr = errors.New("listener closed")

type Listener interface {
Expand Down
11 changes: 11 additions & 0 deletions pkg/kds/v2/reconcile/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,19 @@ import (

// Reconciler re-computes configuration for a given node.
type Reconciler interface {
<<<<<<< HEAD

Check failure on line 12 in pkg/kds/v2/reconcile/interfaces.go

View workflow job for this annotation

GitHub Actions / lint

expected '}', found '<<' (typecheck)
Reconcile(context.Context, *envoy_core.Node) error
Clear(*envoy_core.Node)
=======
// Reconcile reconciles state of node given changed resource types.
// Returns error and bool which is true if any resource was changed.
Reconcile(context.Context, *envoy_core.Node, map[model.ResourceType]struct{}, logr.Logger) (error, bool)
// ForceVersion marks that resource type for a node ID will obtain a new version even if nothing changes.
// Note that it does not change snapshot, for this to actually apply on Envoy, we need to call Reconcile.
// It's not called immediately to avoid parallel Reconcile calls for the same node.
ForceVersion(node *envoy_core.Node, resourceType model.ResourceType)
Clear(context.Context, *envoy_core.Node) error
>>>>>>> 4752f7b82 (fix(kds): fix retry on NACK and add backoff (#9736))

Check failure on line 24 in pkg/kds/v2/reconcile/interfaces.go

View workflow job for this annotation

GitHub Actions / lint

illegal character U+0023 '#' (typecheck)
}

// Generates a snapshot of xDS resources for a given node.
Expand Down
11 changes: 11 additions & 0 deletions pkg/kds/v2/reconcile/reconcile_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package reconcile_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestReconcile(t *testing.T) {
test.RunSpecs(t, "Reconcile Suite")
}
123 changes: 109 additions & 14 deletions pkg/kds/v2/reconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package reconcile

import (
"context"
"errors"
"sync"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
<<<<<<< HEAD

Check failure on line 9 in pkg/kds/v2/reconcile/reconciler.go

View workflow job for this annotation

GitHub Actions / lint

missing import path (typecheck)
"google.golang.org/protobuf/proto"
=======

Check failure on line 11 in pkg/kds/v2/reconcile/reconciler.go

View workflow job for this annotation

GitHub Actions / lint

missing import path (typecheck)
"github.com/go-logr/logr"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
>>>>>>> 4752f7b82 (fix(kds): fix retry on NACK and add backoff (#9736))

Check failure on line 15 in pkg/kds/v2/reconcile/reconciler.go

View workflow job for this annotation

GitHub Actions / lint

missing import path (typecheck)

config_core "github.com/kumahq/kuma/pkg/config/core"
"github.com/kumahq/kuma/pkg/core"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
cache_v2 "github.com/kumahq/kuma/pkg/kds/v2/cache"
util_kds_v2 "github.com/kumahq/kuma/pkg/kds/v2/util"
Expand All @@ -33,6 +36,11 @@ func NewReconciler(
generator: generator,
mode: mode,
statsCallbacks: statsCallbacks,
<<<<<<< HEAD

Check failure on line 39 in pkg/kds/v2/reconcile/reconciler.go

View workflow job for this annotation

GitHub Actions / lint

expected operand, found '<<' (typecheck)
=======
tenants: tenants,
forceVersions: map[string][]core_model.ResourceType{},
>>>>>>> 4752f7b82 (fix(kds): fix retry on NACK and add backoff (#9736))

Check failure on line 43 in pkg/kds/v2/reconcile/reconciler.go

View workflow job for this annotation

GitHub Actions / lint

illegal character U+0023 '#' (typecheck)
}
}

Expand All @@ -44,6 +52,16 @@ type reconciler struct {
statsCallbacks xds.StatsCallbacks

lock sync.Mutex

forceVersions map[string][]core_model.ResourceType
forceVersionsLock sync.RWMutex
}

func (r *reconciler) ForceVersion(node *envoy_core.Node, resourceType core_model.ResourceType) {
nodeID := r.hasher.ID(node)
r.forceVersionsLock.Lock()
r.forceVersions[nodeID] = append(r.forceVersions[nodeID], resourceType)
r.forceVersionsLock.Unlock()
}

func (r *reconciler) Clear(node *envoy_core.Node) {
Expand Down Expand Up @@ -73,6 +91,7 @@ func (r *reconciler) Reconcile(ctx context.Context, node *envoy_core.Node) error
}
id := r.hasher.ID(node)
old, _ := r.cache.GetSnapshot(id)
<<<<<<< HEAD
new = r.Version(new, old)
r.logChanges(new, old, node)
r.meterConfigReadyForDelivery(new, old)
Expand All @@ -84,12 +103,60 @@ func (r *reconciler) Version(new, old envoy_cache.ResourceSnapshot) envoy_cache.
return nil
}
newResources := map[core_model.ResourceType]envoy_cache.Resources{}
=======

// construct builder with unchanged types from the old snapshot
builder := cache_v2.NewSnapshotBuilder()
if old != nil {
for _, typ := range util_kds_v2.GetSupportedTypes() {
resType := core_model.ResourceType(typ)
if _, ok := changedTypes[resType]; ok {
continue
}

oldRes := old.GetResources(typ)
if len(oldRes) > 0 {
builder = builder.With(resType, maps.Values(oldRes))
}
}
}

new, err := r.generator.GenerateSnapshot(ctx, node, builder, changedTypes)
if err != nil {
return err, false
}
if new == nil {
return errors.New("nil snapshot"), false
}
// call ConstructVersionMap, so we can override versions if needed and compute what changed
if old != nil {
// this should already be computed by SetSnapshot, but we call it just to make sure we have versions.
if err := old.ConstructVersionMap(); err != nil {
return errors.Wrap(err, "could not construct version map"), false
}
}
if err := new.ConstructVersionMap(); err != nil {
return errors.Wrap(err, "could not construct version map"), false
}
r.forceNewVersion(new, id)

if changed := r.changedTypes(old, new); len(changed) > 0 {
r.logChanges(logger, changed, node)
r.meterConfigReadyForDelivery(changed, node.Id)
return r.cache.SetSnapshot(ctx, id, new), true
}
return nil, false
}

func (r *reconciler) changedTypes(old, new envoy_cache.ResourceSnapshot) []core_model.ResourceType {
var changed []core_model.ResourceType
>>>>>>> 4752f7b82 (fix(kds): fix retry on NACK and add backoff (#9736))
for _, typ := range util_kds_v2.GetSupportedTypes() {
version := new.GetVersion(typ)
if version != "" {
// favor a version assigned by resource generator
continue
if (old == nil && len(new.GetVersionMap(typ)) > 0) ||
(old != nil && !maps.Equal(old.GetVersionMap(typ), new.GetVersionMap(typ))) {
changed = append(changed, core_model.ResourceType(typ))
}
<<<<<<< HEAD

if old != nil && r.equal(new.GetResources(typ), old.GetResources(typ)) {
version = old.GetVersion(typ)
Expand All @@ -113,16 +180,24 @@ func (r *reconciler) Version(new, old envoy_cache.ResourceSnapshot) envoy_cache.
return &cache_v2.Snapshot{
Resources: newResources,
}
=======
}
return changed
>>>>>>> 4752f7b82 (fix(kds): fix retry on NACK and add backoff (#9736))
}

func (_ *reconciler) equal(new, old map[string]envoy_types.Resource) bool {
if len(new) != len(old) {
return false
}
for key, newValue := range new {
if oldValue, hasOldValue := old[key]; !hasOldValue || !proto.Equal(newValue, oldValue) {
return false
// see kdsRetryForcer for more information
func (r *reconciler) forceNewVersion(snapshot envoy_cache.ResourceSnapshot, id string) {
r.forceVersionsLock.Lock()
forceVersionsForTypes := r.forceVersions[id]
delete(r.forceVersions, id)
r.forceVersionsLock.Unlock()
for _, typ := range forceVersionsForTypes {
cacheSnapshot, ok := snapshot.(*cache_v2.Snapshot)
if !ok {
panic("invalid type of Snapshot")
}
<<<<<<< HEAD
}
return true
}
Expand All @@ -136,14 +211,34 @@ func (r *reconciler) logChanges(new envoy_cache.ResourceSnapshot, old envoy_cach
client = "global"
}
log.Info("detected changes in the resources. Sending changes to the client.", "resourceType", typ, "client", client)
=======
for resourceName := range cacheSnapshot.VersionMap[typ] {
cacheSnapshot.VersionMap[typ][resourceName] = ""
>>>>>>> 4752f7b82 (fix(kds): fix retry on NACK and add backoff (#9736))
}
}
}

<<<<<<< HEAD
func (r *reconciler) meterConfigReadyForDelivery(new envoy_cache.ResourceSnapshot, old envoy_cache.ResourceSnapshot) {
for _, typ := range util_kds_v2.GetSupportedTypes() {
if old == nil || old.GetVersion(typ) != new.GetVersion(typ) {
r.statsCallbacks.ConfigReadyForDelivery(new.GetVersion(typ))
=======
func (r *reconciler) logChanges(logger logr.Logger, changedTypes []core_model.ResourceType, node *envoy_core.Node) {
for _, typ := range changedTypes {
client := node.Id
if r.mode == config_core.Zone {
// we need to override client name because Zone is always a client to Global (on gRPC level)
client = "global"
>>>>>>> 4752f7b82 (fix(kds): fix retry on NACK and add backoff (#9736))
}
logger.Info("detected changes in the resources. Sending changes to the client.", "resourceType", typ, "client", client) // todo is client needed?
}
}

func (r *reconciler) meterConfigReadyForDelivery(changedTypes []core_model.ResourceType, nodeID string) {
for _, typ := range changedTypes {
r.statsCallbacks.ConfigReadyForDelivery(nodeID + string(typ))
}
}
96 changes: 96 additions & 0 deletions pkg/kds/v2/reconcile/reconciler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package reconcile_test

import (
"context"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

config_core "github.com/kumahq/kuma/pkg/config/core"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/kds/v2/reconcile"
"github.com/kumahq/kuma/pkg/kds/v2/server"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
"github.com/kumahq/kuma/pkg/multitenant"
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
"github.com/kumahq/kuma/pkg/test/resources/samples"
util_xds "github.com/kumahq/kuma/pkg/util/xds"
)

var _ = Describe("Reconciler", func() {
var reconciler reconcile.Reconciler
var store core_store.ResourceStore
var snapshotCache envoy_cache.SnapshotCache

node := &envoy_core.Node{
Id: "a",
}
changedTypes := map[core_model.ResourceType]struct{}{
core_mesh.MeshType: {},
}

BeforeEach(func() {
store = memory.NewStore()
generator := reconcile.NewSnapshotGenerator(core_manager.NewResourceManager(store), reconcile.Any, reconcile.NoopResourceMapper)
hasher := &server.Hasher{}
snapshotCache = envoy_cache.NewSnapshotCache(false, hasher, util_xds.NewLogger(logr.Discard()))
metrics, err := core_metrics.NewMetrics("zone-1")
Expect(err).ToNot(HaveOccurred())
statsCallbacks, err := util_xds.NewStatsCallbacks(metrics, "kds_delta", util_xds.NoopVersionExtractor)
Expect(err).ToNot(HaveOccurred())
reconciler = reconcile.NewReconciler(hasher, snapshotCache, generator, config_core.Zone, statsCallbacks, multitenant.SingleTenant)
})

It("should reconcile snapshot in snapshot cache", func() {
// given
Expect(store.Create(context.Background(), samples.MeshDefault(), core_store.CreateByKey(core_model.DefaultMesh, core_model.NoMesh))).To(Succeed())

// when
err, changed := reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())

// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeTrue())
snapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())
Expect(snapshot.GetResources(string(core_mesh.MeshType))).To(HaveLen(1))

// when reconciled again without resource changes
err, changed = reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())

// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeFalse())
newSnapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())
Expect(snapshot).To(BeIdenticalTo(newSnapshot))
})

It("should force new version in snapshot cache", func() {
// given
Expect(store.Create(context.Background(), samples.MeshDefault(), core_store.CreateByKey(core_model.DefaultMesh, core_model.NoMesh))).To(Succeed())

err, _ := reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())
Expect(err).ToNot(HaveOccurred())
snapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())

// when
reconciler.ForceVersion(node, core_mesh.MeshType)
err, changed := reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())

// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeTrue())
newSnapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())
Expect(snapshot.GetResources(string(core_mesh.MeshType))).To(Equal(newSnapshot.GetResources(string(core_mesh.MeshType))))
Expect(snapshot.GetVersionMap(string(core_mesh.MeshType))).ToNot(Equal(newSnapshot.GetVersionMap(string(core_mesh.MeshType))))
})
})
15 changes: 12 additions & 3 deletions pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func New(
util_xds_v3.AdaptDeltaCallbacks(util_xds.LoggingCallbacks{Log: log}),
util_xds_v3.AdaptDeltaCallbacks(statsCallbacks),
// util_xds_v3.AdaptDeltaCallbacks(NewNackBackoff(nackBackoff)),
newKdsRetryForcer(log, cache, hasher),
newKdsRetryForcer(log, reconciler.ForceVersion, nackBackoff, rt.EventBus()),
syncTracker,
}
if insight {
Expand Down Expand Up @@ -117,13 +117,22 @@ func newSyncTracker(log logr.Logger, reconciler reconcile_v2.Reconciler, refresh
}

func newKDSContext(log logr.Logger) (envoy_cache.NodeHash, envoy_cache.SnapshotCache) { //nolint:unparam
hasher := hasher{}
hasher := Hasher{}
logger := util_xds.NewLogger(log)
return hasher, envoy_cache.NewSnapshotCache(false, hasher, logger)
}

type hasher struct{}
type Hasher struct{}

<<<<<<< HEAD
func (_ hasher) ID(node *envoy_core.Node) string {
return node.Id
=======
func (_ Hasher) ID(node *envoy_core.Node) string {
tenantID, found := util.TenantFromMetadata(node)
if !found {
return node.Id
}
return node.Id + ":" + tenantID
>>>>>>> 4752f7b82 (fix(kds): fix retry on NACK and add backoff (#9736))
}
Loading

0 comments on commit 0dc9faf

Please sign in to comment.