Skip to content

Commit

Permalink
fix(kds): fix retry on NACK and add backoff (backport of #9736) (#9858)
Browse files Browse the repository at this point in the history
Co-authored-by: Krzysztof Słonka <[email protected]>
Co-authored-by: Jakub Dyszkiewicz <[email protected]>
  • Loading branch information
3 people authored Apr 11, 2024
1 parent 2a7e501 commit 99ee1a4
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 118 deletions.
5 changes: 5 additions & 0 deletions pkg/events/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type TriggerInsightsComputationEvent struct {
TenantID string
}

type TriggerKDSResyncEvent struct {
Type model.ResourceType
NodeID string
}

var ListenerStoppedErr = errors.New("listener closed")

type Listener interface {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kds/v2/reconcile/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type Reconciler interface {
// Reconcile reconciles state of node given changed resource types.
// Returns error and bool which is true if any resource was changed.
Reconcile(context.Context, *envoy_core.Node, map[model.ResourceType]struct{}, logr.Logger) (error, bool)
// ForceVersion marks that resource type for a node ID will obtain a new version even if nothing changes.
// Note that it does not change snapshot, for this to actually apply on Envoy, we need to call Reconcile.
// It's not called immediately to avoid parallel Reconcile calls for the same node.
ForceVersion(node *envoy_core.Node, resourceType model.ResourceType)
Clear(context.Context, *envoy_core.Node) error
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/kds/v2/reconcile/reconcile_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package reconcile_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestReconcile(t *testing.T) {
test.RunSpecs(t, "Reconcile Suite")
}
114 changes: 55 additions & 59 deletions pkg/kds/v2/reconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@ package reconcile

import (
"context"
"errors"
"sync"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"

config_core "github.com/kumahq/kuma/pkg/config/core"
"github.com/kumahq/kuma/pkg/core"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
cache_v2 "github.com/kumahq/kuma/pkg/kds/v2/cache"
util_kds_v2 "github.com/kumahq/kuma/pkg/kds/v2/util"
Expand All @@ -29,6 +26,7 @@ func NewReconciler(hasher envoy_cache.NodeHash, cache envoy_cache.SnapshotCache,
mode: mode,
statsCallbacks: statsCallbacks,
tenants: tenants,
forceVersions: map[string][]core_model.ResourceType{},
}
}

Expand All @@ -41,6 +39,16 @@ type reconciler struct {
tenants multitenant.Tenants

lock sync.Mutex

forceVersions map[string][]core_model.ResourceType
forceVersionsLock sync.RWMutex
}

func (r *reconciler) ForceVersion(node *envoy_core.Node, resourceType core_model.ResourceType) {
nodeID := r.hasher.ID(node)
r.forceVersionsLock.Lock()
r.forceVersions[nodeID] = append(r.forceVersions[nodeID], resourceType)
r.forceVersionsLock.Unlock()
}

func (r *reconciler) Clear(ctx context.Context, node *envoy_core.Node) error {
Expand Down Expand Up @@ -88,79 +96,67 @@ func (r *reconciler) Reconcile(ctx context.Context, node *envoy_core.Node, chang
if new == nil {
return errors.New("nil snapshot"), false
}
// call ConstructVersionMap, so we can override versions if needed and compute what changed
if old != nil {
// this should already be computed by SetSnapshot, but we call it just to make sure we have versions.
if err := old.ConstructVersionMap(); err != nil {
return errors.Wrap(err, "could not construct version map"), false
}
}
if err := new.ConstructVersionMap(); err != nil {
return errors.Wrap(err, "could not construct version map"), false
}
r.forceNewVersion(new, id)

new, changed := r.Version(new, old)
if changed {
r.logChanges(logger, new, old, node)
r.meterConfigReadyForDelivery(new, old, node.Id)
if changed := r.changedTypes(old, new); len(changed) > 0 {
r.logChanges(logger, changed, node)
r.meterConfigReadyForDelivery(changed, node.Id)
return r.cache.SetSnapshot(ctx, id, new), true
}
return nil, false
}

func (r *reconciler) Version(new, old envoy_cache.ResourceSnapshot) (envoy_cache.ResourceSnapshot, bool) {
if new == nil {
return nil, false
}
changed := false
newResources := map[core_model.ResourceType]envoy_cache.Resources{}
func (r *reconciler) changedTypes(old, new envoy_cache.ResourceSnapshot) []core_model.ResourceType {
var changed []core_model.ResourceType
for _, typ := range util_kds_v2.GetSupportedTypes() {
version := new.GetVersion(typ)
if version != "" {
// favor a version assigned by resource generator
continue
}

if old != nil && r.equal(new.GetResources(typ), old.GetResources(typ)) {
version = old.GetVersion(typ)
}
if version == "" {
version = core.NewUUID()
changed = true
}
if new.GetVersion(typ) == version {
continue
if (old == nil && len(new.GetVersionMap(typ)) > 0) ||
(old != nil && !maps.Equal(old.GetVersionMap(typ), new.GetVersionMap(typ))) {
changed = append(changed, core_model.ResourceType(typ))
}
n := map[string]envoy_types.ResourceWithTTL{}
for k, v := range new.GetResourcesAndTTL(typ) {
n[k] = v
}
newResources[core_model.ResourceType(typ)] = envoy_cache.Resources{Version: version, Items: n}
}
return &cache_v2.Snapshot{
Resources: newResources,
}, changed
return changed
}

func (_ *reconciler) equal(new, old map[string]envoy_types.Resource) bool {
if len(new) != len(old) {
return false
}
for key, newValue := range new {
if oldValue, hasOldValue := old[key]; !hasOldValue || !proto.Equal(newValue, oldValue) {
return false
// see kdsRetryForcer for more information
func (r *reconciler) forceNewVersion(snapshot envoy_cache.ResourceSnapshot, id string) {
r.forceVersionsLock.Lock()
forceVersionsForTypes := r.forceVersions[id]
delete(r.forceVersions, id)
r.forceVersionsLock.Unlock()
for _, typ := range forceVersionsForTypes {
cacheSnapshot, ok := snapshot.(*cache_v2.Snapshot)
if !ok {
panic("invalid type of Snapshot")
}
for resourceName := range cacheSnapshot.VersionMap[typ] {
cacheSnapshot.VersionMap[typ][resourceName] = ""
}
}
return true
}

func (r *reconciler) logChanges(logger logr.Logger, new envoy_cache.ResourceSnapshot, old envoy_cache.ResourceSnapshot, node *envoy_core.Node) {
for _, typ := range util_kds_v2.GetSupportedTypes() {
if old != nil && old.GetVersion(typ) != new.GetVersion(typ) {
client := node.Id
if r.mode == config_core.Zone {
// we need to override client name because Zone is always a client to Global (on gRPC level)
client = "global"
}
logger.Info("detected changes in the resources. Sending changes to the client.", "resourceType", typ, "client", client) // todo is client needed?
func (r *reconciler) logChanges(logger logr.Logger, changedTypes []core_model.ResourceType, node *envoy_core.Node) {
for _, typ := range changedTypes {
client := node.Id
if r.mode == config_core.Zone {
// we need to override client name because Zone is always a client to Global (on gRPC level)
client = "global"
}
logger.Info("detected changes in the resources. Sending changes to the client.", "resourceType", typ, "client", client) // todo is client needed?
}
}

func (r *reconciler) meterConfigReadyForDelivery(new envoy_cache.ResourceSnapshot, old envoy_cache.ResourceSnapshot, nodeID string) {
for _, typ := range util_kds_v2.GetSupportedTypes() {
if old == nil || old.GetVersion(typ) != new.GetVersion(typ) {
r.statsCallbacks.ConfigReadyForDelivery(nodeID + typ)
}
func (r *reconciler) meterConfigReadyForDelivery(changedTypes []core_model.ResourceType, nodeID string) {
for _, typ := range changedTypes {
r.statsCallbacks.ConfigReadyForDelivery(nodeID + string(typ))
}
}
96 changes: 96 additions & 0 deletions pkg/kds/v2/reconcile/reconciler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package reconcile_test

import (
"context"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

config_core "github.com/kumahq/kuma/pkg/config/core"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/kds/v2/reconcile"
"github.com/kumahq/kuma/pkg/kds/v2/server"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
"github.com/kumahq/kuma/pkg/multitenant"
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
"github.com/kumahq/kuma/pkg/test/resources/samples"
util_xds "github.com/kumahq/kuma/pkg/util/xds"
)

var _ = Describe("Reconciler", func() {
var reconciler reconcile.Reconciler
var store core_store.ResourceStore
var snapshotCache envoy_cache.SnapshotCache

node := &envoy_core.Node{
Id: "a",
}
changedTypes := map[core_model.ResourceType]struct{}{
core_mesh.MeshType: {},
}

BeforeEach(func() {
store = memory.NewStore()
generator := reconcile.NewSnapshotGenerator(core_manager.NewResourceManager(store), reconcile.Any, reconcile.NoopResourceMapper)
hasher := &server.Hasher{}
snapshotCache = envoy_cache.NewSnapshotCache(false, hasher, util_xds.NewLogger(logr.Discard()))
metrics, err := core_metrics.NewMetrics("zone-1")
Expect(err).ToNot(HaveOccurred())
statsCallbacks, err := util_xds.NewStatsCallbacks(metrics, "kds_delta")
Expect(err).ToNot(HaveOccurred())
reconciler = reconcile.NewReconciler(hasher, snapshotCache, generator, config_core.Zone, statsCallbacks, multitenant.SingleTenant)
})

It("should reconcile snapshot in snapshot cache", func() {
// given
Expect(store.Create(context.Background(), samples.MeshDefault(), core_store.CreateByKey(core_model.DefaultMesh, core_model.NoMesh))).To(Succeed())

// when
err, changed := reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())

// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeTrue())
snapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())
Expect(snapshot.GetResources(string(core_mesh.MeshType))).To(HaveLen(1))

// when reconciled again without resource changes
err, changed = reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())

// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeFalse())
newSnapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())
Expect(snapshot).To(BeIdenticalTo(newSnapshot))
})

It("should force new version in snapshot cache", func() {
// given
Expect(store.Create(context.Background(), samples.MeshDefault(), core_store.CreateByKey(core_model.DefaultMesh, core_model.NoMesh))).To(Succeed())

err, _ := reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())
Expect(err).ToNot(HaveOccurred())
snapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())

// when
reconciler.ForceVersion(node, core_mesh.MeshType)
err, changed := reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())

// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeTrue())
newSnapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())
Expect(snapshot.GetResources(string(core_mesh.MeshType))).To(Equal(newSnapshot.GetResources(string(core_mesh.MeshType))))
Expect(snapshot.GetVersionMap(string(core_mesh.MeshType))).ToNot(Equal(newSnapshot.GetVersionMap(string(core_mesh.MeshType))))
})
})
8 changes: 4 additions & 4 deletions pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func New(
util_xds_v3.AdaptDeltaCallbacks(util_xds.LoggingCallbacks{Log: log}),
util_xds_v3.AdaptDeltaCallbacks(statsCallbacks),
// util_xds_v3.AdaptDeltaCallbacks(NewNackBackoff(nackBackoff)),
newKdsRetryForcer(log, cache, hasher),
newKdsRetryForcer(log, reconciler.ForceVersion, nackBackoff, rt.EventBus()),
syncTracker,
}
if insight {
Expand Down Expand Up @@ -173,14 +173,14 @@ func newSyncTracker(
}

func newKDSContext(log logr.Logger) (envoy_cache.NodeHash, envoy_cache.SnapshotCache) { //nolint:unparam
hasher := hasher{}
hasher := Hasher{}
logger := util_xds.NewLogger(log)
return hasher, envoy_cache.NewSnapshotCache(false, hasher, logger)
}

type hasher struct{}
type Hasher struct{}

func (_ hasher) ID(node *envoy_core.Node) string {
func (_ Hasher) ID(node *envoy_core.Node) string {
tenantID, found := util.TenantFromMetadata(node)
if !found {
return node.Id
Expand Down
31 changes: 17 additions & 14 deletions pkg/kds/v2/server/event_based_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,14 @@ var _ util_watchdog.Watchdog = &EventBasedWatchdog{}
func (e *EventBasedWatchdog) Start(stop <-chan struct{}) {
tenantID, _ := multitenant.TenantFromCtx(e.Ctx)
listener := e.EventBus.Subscribe(func(event events.Event) bool {
resChange, ok := event.(events.ResourceChangedEvent)
if !ok {
return false
switch ev := event.(type) {
case events.ResourceChangedEvent:
_, ok := e.ProvidedTypes[ev.Type]
return ok && ev.TenantID == tenantID
case events.TriggerKDSResyncEvent:
return ev.NodeID == e.Node.Id
}
if resChange.TenantID != tenantID {
return false
}
if _, ok := e.ProvidedTypes[resChange.Type]; !ok {
return false
}
return true
return false
})
flushTicker := e.NewFlushTicker()
defer flushTicker.Stop()
Expand Down Expand Up @@ -95,10 +92,16 @@ func (e *EventBasedWatchdog) Start(stop <-chan struct{}) {
changedTypes = maps.Clone(e.ProvidedTypes)
reasons[ReasonResync] = struct{}{}
case event := <-listener.Recv():
resChange := event.(events.ResourceChangedEvent)
e.Log.V(1).Info("schedule sync for type", "typ", resChange.Type)
changedTypes[resChange.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
switch ev := event.(type) {
case events.ResourceChangedEvent:
e.Log.V(1).Info("schedule sync for type", "typ", ev.Type, "event", "ResourceChanged")
changedTypes[ev.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
case events.TriggerKDSResyncEvent:
e.Log.V(1).Info("schedule sync for type", "typ", ev.Type, "event", "TriggerKDSResync")
changedTypes[ev.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
}
}
}
}
Loading

0 comments on commit 99ee1a4

Please sign in to comment.