Skip to content

Commit

Permalink
fix(kuma-cp): index generated certs by proxy type (#10161)
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Beaumont <[email protected]>
  • Loading branch information
michaelbeaumont committed May 6, 2024
1 parent 353cab0 commit faf4649
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 33 deletions.
20 changes: 20 additions & 0 deletions pkg/core/resources/apis/mesh/proxy_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package mesh

import (
"github.com/pkg/errors"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
)

func ProxyTypeFromResourceType(t core_model.ResourceType) (mesh_proto.ProxyType, error) {
switch t {
case DataplaneType:
return mesh_proto.DataplaneProxyType, nil
case ZoneIngressType:
return mesh_proto.IngressProxyType, nil
case ZoneEgressType:
return mesh_proto.EgressProxyType, nil
}
return "", errors.Errorf("%s does not have a corresponding proxy type", t)
}
4 changes: 2 additions & 2 deletions pkg/test/xds/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ func (ts *TestSecrets) GetAllInOne(
return identity, allInOne, nil
}

func (*TestSecrets) Info(model.ResourceKey) *secrets.Info {
func (*TestSecrets) Info(mesh_proto.ProxyType, model.ResourceKey) *secrets.Info {
return TestSecretsInfo
}

func (*TestSecrets) Cleanup(model.ResourceKey) {
func (*TestSecrets) Cleanup(mesh_proto.ProxyType, model.ResourceKey) {
}

var _ secrets.Secrets = &TestSecrets{}
44 changes: 29 additions & 15 deletions pkg/xds/secrets/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Secrets interface {
GetForDataPlane(ctx context.Context, dataplane *core_mesh.DataplaneResource, mesh *core_mesh.MeshResource, otherMeshes []*core_mesh.MeshResource) (*core_xds.IdentitySecret, map[string]*core_xds.CaSecret, error)
GetForZoneEgress(ctx context.Context, zoneEgress *core_mesh.ZoneEgressResource, mesh *core_mesh.MeshResource) (*core_xds.IdentitySecret, *core_xds.CaSecret, error)
GetAllInOne(ctx context.Context, mesh *core_mesh.MeshResource, dataplane *core_mesh.DataplaneResource, otherMeshes []*core_mesh.MeshResource) (*core_xds.IdentitySecret, *core_xds.CaSecret, error)
Info(dpKey model.ResourceKey) *Info
Cleanup(dpKey model.ResourceKey)
Info(mesh_proto.ProxyType, model.ResourceKey) *Info
Cleanup(mesh_proto.ProxyType, model.ResourceKey)
}

type MeshInfo struct {
Expand Down Expand Up @@ -78,24 +78,29 @@ func NewSecrets(caProvider CaProvider, identityProvider IdentityProvider, metric
return &secrets{
caProvider: caProvider,
identityProvider: identityProvider,
cachedCerts: map[model.ResourceKey]*certs{},
cachedCerts: map[certCacheKey]*certs{},
certGenerationsMetric: certGenerationsMetric,
}, nil
}

type certCacheKey struct {
resource model.ResourceKey
proxyType mesh_proto.ProxyType
}

type secrets struct {
caProvider CaProvider
identityProvider IdentityProvider

sync.RWMutex
cachedCerts map[model.ResourceKey]*certs
cachedCerts map[certCacheKey]*certs
certGenerationsMetric *prometheus.CounterVec
}

var _ Secrets = &secrets{}

func (s *secrets) Info(dpKey model.ResourceKey) *Info {
certs := s.certs(dpKey)
func (s *secrets) Info(proxyType mesh_proto.ProxyType, dpKey model.ResourceKey) *Info {
certs := s.certs(proxyType, dpKey)
if certs == nil {
return nil
}
Expand All @@ -117,11 +122,11 @@ func (c *certs) Info() *Info {
return c.info
}

func (s *secrets) certs(dpKey model.ResourceKey) *certs {
func (s *secrets) certs(proxyType mesh_proto.ProxyType, dpKey model.ResourceKey) *certs {
s.RLock()
defer s.RUnlock()

return s.cachedCerts[dpKey]
return s.cachedCerts[certCacheKey{proxyType: proxyType, resource: dpKey}]
}

func (s *secrets) GetForDataPlane(
Expand All @@ -130,7 +135,7 @@ func (s *secrets) GetForDataPlane(
mesh *core_mesh.MeshResource,
otherMeshes []*core_mesh.MeshResource,
) (*core_xds.IdentitySecret, map[string]*core_xds.CaSecret, error) {
identity, cas, _, err := s.get(ctx, dataplane, dataplane.Spec.TagSet(), mesh, otherMeshes)
identity, cas, _, err := s.get(ctx, mesh_proto.DataplaneProxyType, dataplane, dataplane.Spec.TagSet(), mesh, otherMeshes)
return identity, cas, err
}

Expand All @@ -140,7 +145,7 @@ func (s *secrets) GetAllInOne(
dataplane *core_mesh.DataplaneResource,
otherMeshes []*core_mesh.MeshResource,
) (*core_xds.IdentitySecret, *core_xds.CaSecret, error) {
identity, _, allInOne, err := s.get(ctx, dataplane, dataplane.Spec.TagSet(), mesh, otherMeshes)
identity, _, allInOne, err := s.get(ctx, mesh_proto.DataplaneProxyType, dataplane, dataplane.Spec.TagSet(), mesh, otherMeshes)
return identity, allInOne.CaSecret, err
}

Expand All @@ -155,12 +160,13 @@ func (s *secrets) GetForZoneEgress(
},
})

identity, cas, _, err := s.get(ctx, zoneEgress, tags, mesh, nil)
identity, cas, _, err := s.get(ctx, mesh_proto.EgressProxyType, zoneEgress, tags, mesh, nil)
return identity, cas[mesh.GetMeta().GetName()], err
}

func (s *secrets) get(
ctx context.Context,
proxyType mesh_proto.ProxyType,
resource model.Resource,
tags mesh_proto.MultiValueTagSet,
mesh *core_mesh.MeshResource,
Expand All @@ -174,7 +180,7 @@ func (s *secrets) get(

resourceKey := model.MetaToResourceKey(resource.GetMeta())
resourceKey.Mesh = meshName
certs := s.certs(resourceKey)
certs := s.certs(proxyType, resourceKey)

if updateKinds, debugReason := s.shouldGenerateCerts(
certs.Info(),
Expand All @@ -192,8 +198,12 @@ func (s *secrets) get(
return nil, nil, MeshCa{}, errors.Wrap(err, "could not generate certificates")
}

key := certCacheKey{
resource: resourceKey,
proxyType: proxyType,
}
s.Lock()
s.cachedCerts[resourceKey] = certs
s.cachedCerts[key] = certs
s.Unlock()

caMap := map[string]*core_xds.CaSecret{
Expand All @@ -219,9 +229,13 @@ func (s *secrets) get(
return certs.identity, caMap, certs.allInOneCa, nil
}

func (s *secrets) Cleanup(dpKey model.ResourceKey) {
func (s *secrets) Cleanup(proxyType mesh_proto.ProxyType, dpKey model.ResourceKey) {
key := certCacheKey{
resource: dpKey,
proxyType: proxyType,
}
s.Lock()
delete(s.cachedCerts, dpKey)
delete(s.cachedCerts, key)
s.Unlock()
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/xds/secrets/secrets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ var _ = Describe("Secrets", Ordered, func() {
Expect(ca["default"].PemCerts).ToNot(BeEmpty())

// and info is stored
info := secrets.Info(core_model.MetaToResourceKey(newDataplane().Meta))
info := secrets.Info(mesh_proto.DataplaneProxyType, core_model.MetaToResourceKey(newDataplane().Meta))
Expect(info.Generation).To(Equal(now))
Expect(info.Expiration.Unix()).To(Equal(now.Add(1 * time.Hour).Unix()))
Expect(info.OwnMesh.MTLS.EnabledBackend).To(Equal("ca-1"))
Expand Down Expand Up @@ -237,7 +237,7 @@ var _ = Describe("Secrets", Ordered, func() {

It("when cert was cleaned up", func() {
// given
secrets.Cleanup(core_model.MetaToResourceKey(newDataplane().Meta))
secrets.Cleanup(mesh_proto.DataplaneProxyType, core_model.MetaToResourceKey(newDataplane().Meta))

// when
_, _, err := secrets.GetForDataPlane(context.Background(), newDataplane(), newMesh(), nil)
Expand All @@ -255,10 +255,10 @@ var _ = Describe("Secrets", Ordered, func() {
Expect(err).ToNot(HaveOccurred())

// when
secrets.Cleanup(core_model.MetaToResourceKey(newDataplane().Meta))
secrets.Cleanup(mesh_proto.DataplaneProxyType, core_model.MetaToResourceKey(newDataplane().Meta))

// then
Expect(secrets.Info(core_model.MetaToResourceKey(newDataplane().Meta))).To(BeNil())
Expect(secrets.Info(mesh_proto.DataplaneProxyType, core_model.MetaToResourceKey(newDataplane().Meta))).To(BeNil())
})
})

Expand All @@ -274,7 +274,7 @@ var _ = Describe("Secrets", Ordered, func() {
Expect(ca.PemCerts).ToNot(BeEmpty())

// and info is stored
info := secrets.Info(core_model.MetaToResourceKey(newZoneEgress().Meta))
info := secrets.Info(mesh_proto.EgressProxyType, core_model.MetaToResourceKey(newZoneEgress().Meta))
Expect(info.Generation).To(Equal(now))
Expect(info.Expiration.Unix()).To(Equal(now.Add(1 * time.Hour).Unix()))
Expect(info.OwnMesh.MTLS.EnabledBackend).To(Equal("ca-1"))
Expand Down Expand Up @@ -340,7 +340,7 @@ var _ = Describe("Secrets", Ordered, func() {

It("when cert was cleaned up", func() {
// given
secrets.Cleanup(core_model.MetaToResourceKey(newZoneEgress().Meta))
secrets.Cleanup(mesh_proto.EgressProxyType, core_model.MetaToResourceKey(newZoneEgress().Meta))

// when
_, _, err := secrets.GetForZoneEgress(context.Background(), newZoneEgress(), newMesh())
Expand All @@ -358,10 +358,10 @@ var _ = Describe("Secrets", Ordered, func() {
Expect(err).ToNot(HaveOccurred())

// when
secrets.Cleanup(core_model.MetaToResourceKey(newZoneEgress().Meta))
secrets.Cleanup(mesh_proto.EgressProxyType, core_model.MetaToResourceKey(newZoneEgress().Meta))

// then
Expect(secrets.Info(core_model.MetaToResourceKey(newZoneEgress().Meta))).To(BeNil())
Expect(secrets.Info(mesh_proto.EgressProxyType, core_model.MetaToResourceKey(newZoneEgress().Meta))).To(BeNil())
})
})
})
10 changes: 8 additions & 2 deletions pkg/xds/server/callbacks/dataplane_status_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var sinkLog = core.Log.WithName("xds").WithName("sink")

type DataplaneInsightSink interface {
Start(stop <-chan struct{})
Start(stop <-chan struct{}) // TODO error
}

type DataplaneInsightStore interface {
Expand Down Expand Up @@ -73,9 +73,15 @@ func (s *dataplaneInsightSink) Start(stop <-chan struct{}) {
var lastStoredSecretsInfo *secrets.Info
var generation uint32

proxyType, err := core_mesh.ProxyTypeFromResourceType(s.dataplaneType)
if err != nil {
sinkLog.Error(err, "failed to create dataplaneInsightSink")
return
}

flush := func(closing bool) {
dataplaneID, currentState := s.accessor.GetStatus()
secretsInfo := s.secrets.Info(dataplaneID)
secretsInfo := s.secrets.Info(proxyType, dataplaneID)

select {
case <-generationTicker.C:
Expand Down
21 changes: 15 additions & 6 deletions pkg/xds/sync/dataplane_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (d *DataplaneWatchdog) Cleanup() error {
proxyID := core_xds.FromResourceKey(d.key)
switch d.dpType {
case mesh_proto.DataplaneProxyType:
d.EnvoyCpCtx.Secrets.Cleanup(d.key)
d.EnvoyCpCtx.Secrets.Cleanup(mesh_proto.DataplaneProxyType, d.key)
return d.DataplaneReconciler.Clear(&proxyID)
case mesh_proto.IngressProxyType:
return d.IngressReconciler.Clear(&proxyID)
Expand All @@ -103,7 +103,10 @@ func (d *DataplaneWatchdog) Cleanup() error {
d.MeshCache.GetMeshContext,
)
for _, mesh := range aggregatedMeshCtxs.Meshes {
d.EnvoyCpCtx.Secrets.Cleanup(core_model.ResourceKey{Mesh: mesh.GetMeta().GetName(), Name: d.key.Name})
d.EnvoyCpCtx.Secrets.Cleanup(
mesh_proto.EgressProxyType,
core_model.ResourceKey{Mesh: mesh.GetMeta().GetName(), Name: d.key.Name},
)
}
return std_errors.Join(aggregateMeshContextsErr, d.EgressReconciler.Clear(&proxyID))
default:
Expand All @@ -119,7 +122,7 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xd
return SyncResult{}, errors.Wrap(err, "could not get mesh context")
}

certInfo := d.EnvoyCpCtx.Secrets.Info(d.key)
certInfo := d.EnvoyCpCtx.Secrets.Info(mesh_proto.DataplaneProxyType, d.key)
syncForCert := certInfo != nil && certInfo.ExpiringSoon() // check if we need to regenerate config because identity cert is expiring soon.
syncForConfig := meshCtx.Hash != d.lastHash // check if we need to regenerate config because Kuma policies has changed.
result := SyncResult{
Expand Down Expand Up @@ -151,7 +154,7 @@ func (d *DataplaneWatchdog) syncDataplane(ctx context.Context, metadata *core_xd
}
proxy.EnvoyAdminMTLSCerts = envoyAdminMTLS
if !envoyCtx.Mesh.Resource.MTLSEnabled() {
d.EnvoyCpCtx.Secrets.Cleanup(d.key) // we need to cleanup secrets if mtls is disabled
d.EnvoyCpCtx.Secrets.Cleanup(mesh_proto.DataplaneProxyType, d.key) // we need to cleanup secrets if mtls is disabled
}
proxy.Metadata = metadata
changed, err := d.DataplaneReconciler.Reconcile(ctx, *envoyCtx, proxy)
Expand Down Expand Up @@ -187,7 +190,10 @@ func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.
syncForConfig := aggregatedMeshCtxs.Hash != d.lastHash
var syncForCert bool
for _, mesh := range aggregatedMeshCtxs.Meshes {
certInfo := d.EnvoyCpCtx.Secrets.Info(core_model.ResourceKey{Mesh: mesh.GetMeta().GetName(), Name: d.key.Name})
certInfo := d.EnvoyCpCtx.Secrets.Info(
mesh_proto.IngressProxyType,
core_model.ResourceKey{Mesh: mesh.GetMeta().GetName(), Name: d.key.Name},
)
syncForCert = syncForCert || (certInfo != nil && certInfo.ExpiringSoon()) // check if we need to regenerate config because identity cert is expiring soon.
}
if !syncForConfig && !syncForCert {
Expand Down Expand Up @@ -245,7 +251,10 @@ func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.D
syncForConfig := aggregatedMeshCtxs.Hash != d.lastHash
var syncForCert bool
for _, mesh := range aggregatedMeshCtxs.Meshes {
certInfo := d.EnvoyCpCtx.Secrets.Info(core_model.ResourceKey{Mesh: mesh.GetMeta().GetName(), Name: d.key.Name})
certInfo := d.EnvoyCpCtx.Secrets.Info(
mesh_proto.EgressProxyType,
core_model.ResourceKey{Mesh: mesh.GetMeta().GetName(), Name: d.key.Name},
)
syncForCert = syncForCert || (certInfo != nil && certInfo.ExpiringSoon()) // check if we need to regenerate config because identity cert is expiring soon.
}
if !syncForConfig && !syncForCert {
Expand Down

0 comments on commit faf4649

Please sign in to comment.