Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: show namespaced resources owned by cluster scoped resources #366

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
113 changes: 110 additions & 3 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured)
type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool)

// OnResourceUpdatedHandler handlers resource update event
type OnResourceUpdatedHandler func(newRes *Resource, oldRes *Resource, namespaceResources map[kube.ResourceKey]*Resource)
type OnResourceUpdatedHandler func(newRes *Resource, oldRes *Resource, resources map[kube.ResourceKey]*Resource)
type Unsubscribe func()

type ClusterCache interface {
Expand Down Expand Up @@ -153,6 +153,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
resources: make(map[kube.ResourceKey]*Resource),
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
childrenByParent: make(map[kube.ResourceKey][]kube.ResourceKey),
config: config,
kubectl: &kube.KubectlCmd{
Log: log,
Expand Down Expand Up @@ -206,6 +207,13 @@ type clusterCache struct {
lock sync.RWMutex
resources map[kube.ResourceKey]*Resource
nsIndex map[string]map[kube.ResourceKey]*Resource
// childrenByParent maps a parent ref to the list of child refs
childrenByParent map[kube.ResourceKey][]kube.ResourceKey

// Set showClusterResourceChildren to true to show namespace scoped
// resources owned by cluster scoped resources. By default, this feature is
// disabled due to performance implications.
showClusterResourceChildren bool

kubectl kube.Kubectl
log logr.Logger
Expand Down Expand Up @@ -397,6 +405,7 @@ func (c *clusterCache) newResource(un *unstructured.Unstructured) *Resource {
}

func (c *clusterCache) setNode(n *Resource) {
c.updateChildrenByParentMap(n)
key := n.ResourceKey()
c.resources[key] = n
ns, ok := c.nsIndex[key.Namespace]
Expand Down Expand Up @@ -968,12 +977,29 @@ func (c *clusterCache) FindResources(namespace string, predicates ...func(r *Res
return result
}

func (c *clusterCache) addChildResourcesRecursivly(nsNodes map[kube.ResourceKey]*Resource, key kube.ResourceKey) map[kube.ResourceKey]*Resource {
nsNodeReturned := nsNodes
childRefs := c.childrenByParent[key]
childResources := map[kube.ResourceKey]*Resource{}
for _, childRef := range childRefs {
if c.resources[childRef] != nil {
childResources[childRef] = c.resources[childRef]
nsNodeReturned = c.addChildResourcesRecursivly(nsNodeReturned, childRef)
}
}
// Merge the maps into a different copy to avoid updating the original nsIndex map
return mergeResourceMaps(nsNodeReturned, childResources)
}

// IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree
func (c *clusterCache) IterateHierarchy(key kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) {
c.lock.RLock()
defer c.lock.RUnlock()
if res, ok := c.resources[key]; ok {
nsNodes := c.nsIndex[key.Namespace]
if key.Namespace == "" && c.showClusterResourceChildren {
nsNodes = c.addChildResourcesRecursivly(nsNodes, key)
}
if !action(res, nsNodes) {
return
}
Expand Down Expand Up @@ -1136,11 +1162,25 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst
func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
c.setNode(newRes)
for _, h := range c.getResourceUpdatedHandlers() {
h(newRes, oldRes, c.nsIndex[newRes.Ref.Namespace])
h(newRes, oldRes, getNSResourcesWithClusterReources(c, newRes.Ref.Namespace))
}
}

// Returns all resources in a particular namespace. It also includes cluster scoped resources
// if showClusterResourceChildren is enabled
func getNSResourcesWithClusterReources(c *clusterCache, ns string) map[kube.ResourceKey]*Resource {
resources := c.nsIndex[ns]
if c.showClusterResourceChildren {
// We include cluster scoped resources because a namespace scoped resource could be owned
// either by a cluster scoped or a namespace scoped parent.
resources = mergeResourceMaps(resources, c.nsIndex[""])
}

return resources
}

func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
c.removeFromChildrenByParentMap(key)
existing, ok := c.resources[key]
if ok {
delete(c.resources, key)
Expand All @@ -1150,6 +1190,7 @@ func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
if len(ns) == 0 {
delete(c.nsIndex, key.Namespace)
}

// remove ownership references from children with inferred references
if existing.isInferredParentOf != nil {
for k, v := range ns {
Expand All @@ -1160,7 +1201,73 @@ func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
}
}
for _, h := range c.getResourceUpdatedHandlers() {
h(nil, existing, ns)
h(nil, existing, getNSResourcesWithClusterReources(c, key.Namespace))
}
}
}

// mergeResourceMaps merges the given maps into a different copy in order to avoid updates
// to the original maps
func mergeResourceMaps(a, b map[kube.ResourceKey]*Resource) map[kube.ResourceKey]*Resource {
mergedMap := map[kube.ResourceKey]*Resource{}
for k, v := range a {
mergedMap[k] = v
}

for k, v := range b {
mergedMap[k] = v
}

return mergedMap
}

func (c *clusterCache) updateChildrenByParentMap(res *Resource) {
if res == nil || !c.showClusterResourceChildren {
return
}
childKey := res.ResourceKey()
if _, ok := c.childrenByParent[childKey]; !ok {
c.childrenByParent[childKey] = []kube.ResourceKey{}
}
for _, parent := range res.OwnerRefs {
parentGvk := schema.FromAPIVersionAndKind(parent.APIVersion, parent.Kind)
var namespace string
if isNamespaced, _ := c.IsNamespaced(parentGvk.GroupKind()); isNamespaced {
namespace = res.Ref.Namespace
}
parentKey := kube.NewResourceKey(parentGvk.Group, parentGvk.Kind, namespace, parent.Name)
childRefs, ok := c.childrenByParent[parentKey]
if !ok {
c.childrenByParent[parentKey] = []kube.ResourceKey{}
}
exists := false
for _, cr := range childRefs {
if cr == childKey {
exists = true
}
}
if !exists {
childRefs = append(childRefs, childKey)
}
c.childrenByParent[parentKey] = childRefs
}
}

func (c *clusterCache) removeFromChildrenByParentMap(key kube.ResourceKey) {
if !c.showClusterResourceChildren {
return
}

delete(c.childrenByParent, key)

for k, childRefs := range c.childrenByParent {
for i, childRef := range childRefs {
childRefKey := kube.NewResourceKey(childRef.Group, childRef.Kind, childRef.Namespace, childRef.Name)
if childRefKey == key {
// remove the childRef that matches the deleted resource
childRefs[i] = childRefs[len(childRefs)-1]
c.childrenByParent[k] = childRefs[:len(childRefs)-1]
}
}
}
}
Expand Down
Loading
Loading