Skip to content

Commit

Permalink
fix(kuma-cp): consistently check for expiring ZoneIngress/ZoneEgress …
Browse files Browse the repository at this point in the history
…certs (backport of #10160, #10162, #10161) (#10166)

* fix(kuma-cp): consistently check for expiring ZoneIngress/ZoneEgress certs (#10160)

Signed-off-by: Mike Beaumont <[email protected]>

* fix(kuma-cp): cleanup generated egress certs (#10162)

Signed-off-by: Mike Beaumont <[email protected]>
Co-authored-by: Bart Smykla <[email protected]>

* fix(kuma-cp): index generated certs by proxy type (#10161)

Signed-off-by: Mike Beaumont <[email protected]>

---------

Signed-off-by: Mike Beaumont <[email protected]>
Co-authored-by: Mike Beaumont <[email protected]>
Co-authored-by: Bart Smykla <[email protected]>
  • Loading branch information
3 people authored May 6, 2024
1 parent af3a873 commit 456b88a
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 35 deletions.
20 changes: 20 additions & 0 deletions pkg/core/resources/apis/mesh/proxy_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package mesh

import (
"github.com/pkg/errors"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
)

func ProxyTypeFromResourceType(t core_model.ResourceType) (mesh_proto.ProxyType, error) {
switch t {
case DataplaneType:
return mesh_proto.DataplaneProxyType, nil
case ZoneIngressType:
return mesh_proto.IngressProxyType, nil
case ZoneEgressType:
return mesh_proto.EgressProxyType, nil
}
return "", errors.Errorf("%s does not have a corresponding proxy type", t)
}
4 changes: 2 additions & 2 deletions pkg/test/xds/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ func (ts *TestSecrets) GetAllInOne(
return identity, allInOne, nil
}

func (*TestSecrets) Info(model.ResourceKey) *secrets.Info {
func (*TestSecrets) Info(mesh_proto.ProxyType, model.ResourceKey) *secrets.Info {
return TestSecretsInfo
}

func (*TestSecrets) Cleanup(model.ResourceKey) {
func (*TestSecrets) Cleanup(mesh_proto.ProxyType, model.ResourceKey) {
}

var _ secrets.Secrets = &TestSecrets{}
44 changes: 29 additions & 15 deletions pkg/xds/secrets/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Secrets interface {
GetForDataPlane(ctx context.Context, dataplane *core_mesh.DataplaneResource, mesh *core_mesh.MeshResource, otherMeshes []*core_mesh.MeshResource) (*core_xds.IdentitySecret, map[string]*core_xds.CaSecret, error)
GetForZoneEgress(ctx context.Context, zoneEgress *core_mesh.ZoneEgressResource, mesh *core_mesh.MeshResource) (*core_xds.IdentitySecret, *core_xds.CaSecret, error)
GetAllInOne(ctx context.Context, mesh *core_mesh.MeshResource, dataplane *core_mesh.DataplaneResource, otherMeshes []*core_mesh.MeshResource) (*core_xds.IdentitySecret, *core_xds.CaSecret, error)
Info(dpKey model.ResourceKey) *Info
Cleanup(dpKey model.ResourceKey)
Info(mesh_proto.ProxyType, model.ResourceKey) *Info
Cleanup(mesh_proto.ProxyType, model.ResourceKey)
}

type MeshInfo struct {
Expand Down Expand Up @@ -78,24 +78,29 @@ func NewSecrets(caProvider CaProvider, identityProvider IdentityProvider, metric
return &secrets{
caProvider: caProvider,
identityProvider: identityProvider,
cachedCerts: map[model.ResourceKey]*certs{},
cachedCerts: map[certCacheKey]*certs{},
certGenerationsMetric: certGenerationsMetric,
}, nil
}

type certCacheKey struct {
resource model.ResourceKey
proxyType mesh_proto.ProxyType
}

type secrets struct {
caProvider CaProvider
identityProvider IdentityProvider

sync.RWMutex
cachedCerts map[model.ResourceKey]*certs
cachedCerts map[certCacheKey]*certs
certGenerationsMetric *prometheus.CounterVec
}

var _ Secrets = &secrets{}

func (s *secrets) Info(dpKey model.ResourceKey) *Info {
certs := s.certs(dpKey)
func (s *secrets) Info(proxyType mesh_proto.ProxyType, dpKey model.ResourceKey) *Info {
certs := s.certs(proxyType, dpKey)
if certs == nil {
return nil
}
Expand All @@ -117,11 +122,11 @@ func (c *certs) Info() *Info {
return c.info
}

func (s *secrets) certs(dpKey model.ResourceKey) *certs {
func (s *secrets) certs(proxyType mesh_proto.ProxyType, dpKey model.ResourceKey) *certs {
s.RLock()
defer s.RUnlock()

return s.cachedCerts[dpKey]
return s.cachedCerts[certCacheKey{proxyType: proxyType, resource: dpKey}]
}

func (s *secrets) GetForDataPlane(
Expand All @@ -130,7 +135,7 @@ func (s *secrets) GetForDataPlane(
mesh *core_mesh.MeshResource,
otherMeshes []*core_mesh.MeshResource,
) (*core_xds.IdentitySecret, map[string]*core_xds.CaSecret, error) {
identity, cas, _, err := s.get(ctx, dataplane, dataplane.Spec.TagSet(), mesh, otherMeshes)
identity, cas, _, err := s.get(ctx, mesh_proto.DataplaneProxyType, dataplane, dataplane.Spec.TagSet(), mesh, otherMeshes)
return identity, cas, err
}

Expand All @@ -140,7 +145,7 @@ func (s *secrets) GetAllInOne(
dataplane *core_mesh.DataplaneResource,
otherMeshes []*core_mesh.MeshResource,
) (*core_xds.IdentitySecret, *core_xds.CaSecret, error) {
identity, _, allInOne, err := s.get(ctx, dataplane, dataplane.Spec.TagSet(), mesh, otherMeshes)
identity, _, allInOne, err := s.get(ctx, mesh_proto.DataplaneProxyType, dataplane, dataplane.Spec.TagSet(), mesh, otherMeshes)
return identity, allInOne.CaSecret, err
}

Expand All @@ -155,12 +160,13 @@ func (s *secrets) GetForZoneEgress(
},
})

identity, cas, _, err := s.get(ctx, zoneEgress, tags, mesh, nil)
identity, cas, _, err := s.get(ctx, mesh_proto.EgressProxyType, zoneEgress, tags, mesh, nil)
return identity, cas[mesh.GetMeta().GetName()], err
}

func (s *secrets) get(
ctx context.Context,
proxyType mesh_proto.ProxyType,
resource model.Resource,
tags mesh_proto.MultiValueTagSet,
mesh *core_mesh.MeshResource,
Expand All @@ -174,7 +180,7 @@ func (s *secrets) get(

resourceKey := model.MetaToResourceKey(resource.GetMeta())
resourceKey.Mesh = meshName
certs := s.certs(resourceKey)
certs := s.certs(proxyType, resourceKey)

if updateKinds, debugReason := s.shouldGenerateCerts(
certs.Info(),
Expand All @@ -192,8 +198,12 @@ func (s *secrets) get(
return nil, nil, MeshCa{}, errors.Wrap(err, "could not generate certificates")
}

key := certCacheKey{
resource: resourceKey,
proxyType: proxyType,
}
s.Lock()
s.cachedCerts[resourceKey] = certs
s.cachedCerts[key] = certs
s.Unlock()

caMap := map[string]*core_xds.CaSecret{
Expand All @@ -219,9 +229,13 @@ func (s *secrets) get(
return certs.identity, caMap, certs.allInOneCa, nil
}

func (s *secrets) Cleanup(dpKey model.ResourceKey) {
func (s *secrets) Cleanup(proxyType mesh_proto.ProxyType, dpKey model.ResourceKey) {
key := certCacheKey{
resource: dpKey,
proxyType: proxyType,
}
s.Lock()
delete(s.cachedCerts, dpKey)
delete(s.cachedCerts, key)
s.Unlock()
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/xds/secrets/secrets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ var _ = Describe("Secrets", Ordered, func() {
Expect(ca["default"].PemCerts).ToNot(BeEmpty())

// and info is stored
info := secrets.Info(core_model.MetaToResourceKey(newDataplane().Meta))
info := secrets.Info(mesh_proto.DataplaneProxyType, core_model.MetaToResourceKey(newDataplane().Meta))
Expect(info.Generation).To(Equal(now))
Expect(info.Expiration.Unix()).To(Equal(now.Add(1 * time.Hour).Unix()))
Expect(info.OwnMesh.MTLS.EnabledBackend).To(Equal("ca-1"))
Expand Down Expand Up @@ -237,7 +237,7 @@ var _ = Describe("Secrets", Ordered, func() {

It("when cert was cleaned up", func() {
// given
secrets.Cleanup(core_model.MetaToResourceKey(newDataplane().Meta))
secrets.Cleanup(mesh_proto.DataplaneProxyType, core_model.MetaToResourceKey(newDataplane().Meta))

// when
_, _, err := secrets.GetForDataPlane(context.Background(), newDataplane(), newMesh(), nil)
Expand All @@ -255,10 +255,10 @@ var _ = Describe("Secrets", Ordered, func() {
Expect(err).ToNot(HaveOccurred())

// when
secrets.Cleanup(core_model.MetaToResourceKey(newDataplane().Meta))
secrets.Cleanup(mesh_proto.DataplaneProxyType, core_model.MetaToResourceKey(newDataplane().Meta))

// then
Expect(secrets.Info(core_model.MetaToResourceKey(newDataplane().Meta))).To(BeNil())
Expect(secrets.Info(mesh_proto.DataplaneProxyType, core_model.MetaToResourceKey(newDataplane().Meta))).To(BeNil())
})
})

Expand All @@ -274,7 +274,7 @@ var _ = Describe("Secrets", Ordered, func() {
Expect(ca.PemCerts).ToNot(BeEmpty())

// and info is stored
info := secrets.Info(core_model.MetaToResourceKey(newZoneEgress().Meta))
info := secrets.Info(mesh_proto.EgressProxyType, core_model.MetaToResourceKey(newZoneEgress().Meta))
Expect(info.Generation).To(Equal(now))
Expect(info.Expiration.Unix()).To(Equal(now.Add(1 * time.Hour).Unix()))
Expect(info.OwnMesh.MTLS.EnabledBackend).To(Equal("ca-1"))
Expand Down Expand Up @@ -340,7 +340,7 @@ var _ = Describe("Secrets", Ordered, func() {

It("when cert was cleaned up", func() {
// given
secrets.Cleanup(core_model.MetaToResourceKey(newZoneEgress().Meta))
secrets.Cleanup(mesh_proto.EgressProxyType, core_model.MetaToResourceKey(newZoneEgress().Meta))

// when
_, _, err := secrets.GetForZoneEgress(context.Background(), newZoneEgress(), newMesh())
Expand All @@ -358,10 +358,10 @@ var _ = Describe("Secrets", Ordered, func() {
Expect(err).ToNot(HaveOccurred())

// when
secrets.Cleanup(core_model.MetaToResourceKey(newZoneEgress().Meta))
secrets.Cleanup(mesh_proto.EgressProxyType, core_model.MetaToResourceKey(newZoneEgress().Meta))

// then
Expect(secrets.Info(core_model.MetaToResourceKey(newZoneEgress().Meta))).To(BeNil())
Expect(secrets.Info(mesh_proto.EgressProxyType, core_model.MetaToResourceKey(newZoneEgress().Meta))).To(BeNil())
})
})
})
10 changes: 8 additions & 2 deletions pkg/xds/server/callbacks/dataplane_status_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var sinkLog = core.Log.WithName("xds").WithName("sink")

type DataplaneInsightSink interface {
Start(stop <-chan struct{})
Start(stop <-chan struct{}) // TODO error
}

type DataplaneInsightStore interface {
Expand Down Expand Up @@ -73,9 +73,15 @@ func (s *dataplaneInsightSink) Start(stop <-chan struct{}) {
var lastStoredSecretsInfo *secrets.Info
var generation uint32

proxyType, err := core_mesh.ProxyTypeFromResourceType(s.dataplaneType)
if err != nil {
sinkLog.Error(err, "failed to create dataplaneInsightSink")
return
}

flush := func(closing bool) {
dataplaneID, currentState := s.accessor.GetStatus()
secretsInfo := s.secrets.Info(dataplaneID)
secretsInfo := s.secrets.Info(proxyType, dataplaneID)

select {
case <-generationTicker.C:
Expand Down
54 changes: 46 additions & 8 deletions pkg/xds/sync/dataplane_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sync

import (
"context"
std_errors "errors"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand Down Expand Up @@ -91,12 +92,23 @@ func (d *DataplaneWatchdog) Cleanup() error {
proxyID := core_xds.FromResourceKey(d.key)
switch d.dpType {
case mesh_proto.DataplaneProxyType:
d.EnvoyCpCtx.Secrets.Cleanup(d.key)
d.EnvoyCpCtx.Secrets.Cleanup(mesh_proto.DataplaneProxyType, d.key)
return d.DataplaneReconciler.Clear(&proxyID)
case mesh_proto.IngressProxyType:
return d.IngressReconciler.Clear(&proxyID)
case mesh_proto.EgressProxyType:
return d.EgressReconciler.Clear(&proxyID)
aggregatedMeshCtxs, aggregateMeshContextsErr := xds_context.AggregateMeshContexts(
context.TODO(),
d.ResManager,
d.MeshCache.GetMeshContext,
)
for _, mesh := range aggregatedMeshCtxs.Meshes {
d.EnvoyCpCtx.Secrets.Cleanup(
mesh_proto.EgressProxyType,
core_model.ResourceKey{Mesh: mesh.GetMeta().GetName(), Name: d.key.Name},
)
}
return std_errors.Join(aggregateMeshContextsErr, d.EgressReconciler.Clear(&proxyID))
default:
return nil
}
Expand All @@ -110,7 +122,7 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xd
return SyncResult{}, errors.Wrap(err, "could not get mesh context")
}

certInfo := d.EnvoyCpCtx.Secrets.Info(d.key)
certInfo := d.EnvoyCpCtx.Secrets.Info(mesh_proto.DataplaneProxyType, d.key)
syncForCert := certInfo != nil && certInfo.ExpiringSoon() // check if we need to regenerate config because identity cert is expiring soon.
syncForConfig := meshCtx.Hash != d.lastHash // check if we need to regenerate config because Kuma policies has changed.
result := SyncResult{
Expand Down Expand Up @@ -142,7 +154,7 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xd
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
if !envoyCtx.Mesh.Resource.MTLSEnabled() {
d.EnvoyCpCtx.Secrets.Cleanup(d.key) // we need to cleanup secrets if mtls is disabled
d.EnvoyCpCtx.Secrets.Cleanup(mesh_proto.DataplaneProxyType, d.key) // we need to cleanup secrets if mtls is disabled
}
proxy.Metadata = metadata
changed, err := d.DataplaneReconciler.Reconcile(ctx, *envoyCtx, proxy)
Expand Down Expand Up @@ -176,13 +188,26 @@ func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.
ProxyType: mesh_proto.IngressProxyType,
}
syncForConfig := aggregatedMeshCtxs.Hash != d.lastHash
if !syncForConfig {
var syncForCert bool
for _, mesh := range aggregatedMeshCtxs.Meshes {
certInfo := d.EnvoyCpCtx.Secrets.Info(
mesh_proto.IngressProxyType,
core_model.ResourceKey{Mesh: mesh.GetMeta().GetName(), Name: d.key.Name},
)
syncForCert = syncForCert || (certInfo != nil && certInfo.ExpiringSoon()) // check if we need to regenerate config because identity cert is expiring soon.
}
if !syncForConfig && !syncForCert {
result.Status = SkipStatus
return result, nil
}

d.log.V(1).Info("snapshot hash updated, reconcile", "prev", d.lastHash, "current", aggregatedMeshCtxs.Hash)
d.lastHash = aggregatedMeshCtxs.Hash
if syncForConfig {
d.log.V(1).Info("snapshot hash updated, reconcile", "prev", d.lastHash, "current", aggregatedMeshCtxs.Hash)
}
if syncForCert {
d.log.V(1).Info("certs expiring soon, reconcile")
}

proxy, err := d.IngressProxyBuilder.Build(ctx, d.key, aggregatedMeshCtxs)
if err != nil {
Expand Down Expand Up @@ -224,13 +249,26 @@ func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.D
ProxyType: mesh_proto.EgressProxyType,
}
syncForConfig := aggregatedMeshCtxs.Hash != d.lastHash
if !syncForConfig {
var syncForCert bool
for _, mesh := range aggregatedMeshCtxs.Meshes {
certInfo := d.EnvoyCpCtx.Secrets.Info(
mesh_proto.EgressProxyType,
core_model.ResourceKey{Mesh: mesh.GetMeta().GetName(), Name: d.key.Name},
)
syncForCert = syncForCert || (certInfo != nil && certInfo.ExpiringSoon()) // check if we need to regenerate config because identity cert is expiring soon.
}
if !syncForConfig && !syncForCert {
result.Status = SkipStatus
return result, nil
}

d.log.V(1).Info("snapshot hash updated, reconcile", "prev", d.lastHash, "current", aggregatedMeshCtxs.Hash)
d.lastHash = aggregatedMeshCtxs.Hash
if syncForConfig {
d.log.V(1).Info("snapshot hash updated, reconcile", "prev", d.lastHash, "current", aggregatedMeshCtxs.Hash)
}
if syncForCert {
d.log.V(1).Info("certs expiring soon, reconcile")
}

proxy, err := d.EgressProxyBuilder.Build(ctx, d.key, aggregatedMeshCtxs)
if err != nil {
Expand Down

0 comments on commit 456b88a

Please sign in to comment.