Skip to content

Commit 09055de

Browse files
authored
Merge pull request #112 from ryanzhang-oss/remove-work-status
feat: merge two controllers and rewrite test
2 parents edd54e5 + 8d93a51 commit 09055de

10 files changed

+474
-523
lines changed

Diff for: pkg/controllers/applied_work_syncer.go

+144
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controllers
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
apierrors "k8s.io/apimachinery/pkg/api/errors"
24+
"k8s.io/apimachinery/pkg/api/meta"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
27+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
28+
"k8s.io/klog/v2"
29+
30+
workapi "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
31+
)
32+
33+
// generateDiff check the difference between what is supposed to be applied (tracked by the work CR status)
34+
// and what was applied in the member cluster (tracked by the appliedWork CR).
35+
// What is in the `appliedWork` but not in the `work` should be deleted from the member cluster
36+
// What is in the `work` but not in the `appliedWork` should be added to the appliedWork status
37+
func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Work, appliedWork *workapi.AppliedWork) ([]workapi.AppliedResourceMeta, []workapi.AppliedResourceMeta, error) {
38+
var staleRes, newRes []workapi.AppliedResourceMeta
39+
// for every resource applied in cluster, check if it's still in the work's manifest condition
40+
for _, resourceMeta := range appliedWork.Status.AppliedResources {
41+
resStillExist := false
42+
for _, manifestCond := range work.Status.ManifestConditions {
43+
if resourceMeta.ResourceIdentifier == manifestCond.Identifier {
44+
resStillExist = true
45+
break
46+
}
47+
}
48+
if !resStillExist {
49+
klog.V(5).InfoS("find an orphaned resource in the member cluster",
50+
"parent resource", work.GetName(), "orphaned resource", resourceMeta.ResourceIdentifier)
51+
staleRes = append(staleRes, resourceMeta)
52+
}
53+
}
54+
// add every resource in the work's manifest condition that is applied successfully back to the appliedWork status
55+
for _, manifestCond := range work.Status.ManifestConditions {
56+
ac := meta.FindStatusCondition(manifestCond.Conditions, ConditionTypeApplied)
57+
if ac == nil {
58+
// should not happen
59+
klog.ErrorS(fmt.Errorf("resource is missing applied condition"), "applied condition missing", "resource", manifestCond.Identifier)
60+
continue
61+
}
62+
// we only add the applied one to the appliedWork status
63+
if ac.Status == metav1.ConditionTrue {
64+
resRecorded := false
65+
// we keep the existing resourceMeta since it has the UID
66+
for _, resourceMeta := range appliedWork.Status.AppliedResources {
67+
if resourceMeta.ResourceIdentifier == manifestCond.Identifier {
68+
resRecorded = true
69+
newRes = append(newRes, resourceMeta)
70+
break
71+
}
72+
}
73+
if !resRecorded {
74+
klog.V(5).InfoS("discovered a new resource",
75+
"parent Work", work.GetName(), "discovered resource", manifestCond.Identifier)
76+
obj, err := r.spokeDynamicClient.Resource(schema.GroupVersionResource{
77+
Group: manifestCond.Identifier.Group,
78+
Version: manifestCond.Identifier.Version,
79+
Resource: manifestCond.Identifier.Resource,
80+
}).Namespace(manifestCond.Identifier.Namespace).Get(ctx, manifestCond.Identifier.Name, metav1.GetOptions{})
81+
switch {
82+
case apierrors.IsNotFound(err):
83+
klog.V(4).InfoS("the manifest resource is deleted", "manifest", manifestCond.Identifier)
84+
continue
85+
case err != nil:
86+
klog.ErrorS(err, "failed to retrieve the manifest", "manifest", manifestCond.Identifier)
87+
return nil, nil, err
88+
}
89+
newRes = append(newRes, workapi.AppliedResourceMeta{
90+
ResourceIdentifier: manifestCond.Identifier,
91+
UID: obj.GetUID(),
92+
})
93+
}
94+
}
95+
}
96+
97+
return newRes, staleRes, nil
98+
}
99+
100+
func (r *ApplyWorkReconciler) deleteStaleManifest(ctx context.Context, staleManifests []workapi.AppliedResourceMeta, owner metav1.OwnerReference) error {
101+
var errs []error
102+
103+
for _, staleManifest := range staleManifests {
104+
gvr := schema.GroupVersionResource{
105+
Group: staleManifest.Group,
106+
Version: staleManifest.Version,
107+
Resource: staleManifest.Resource,
108+
}
109+
uObj, err := r.spokeDynamicClient.Resource(gvr).Namespace(staleManifest.Namespace).
110+
Get(ctx, staleManifest.Name, metav1.GetOptions{})
111+
existingOwners := uObj.GetOwnerReferences()
112+
newOwners := make([]metav1.OwnerReference, 0)
113+
found := false
114+
for index, r := range existingOwners {
115+
if isReferSameObject(r, owner) {
116+
found = true
117+
newOwners = append(newOwners, existingOwners[:index]...)
118+
newOwners = append(newOwners, existingOwners[index+1:]...)
119+
}
120+
}
121+
if !found {
122+
klog.ErrorS(err, "the stale manifest is not owned by this work, skip", "manifest", staleManifest, "owner", owner)
123+
continue
124+
}
125+
if len(newOwners) == 0 {
126+
klog.V(2).InfoS("delete the staled manifest", "manifest", staleManifest, "owner", owner)
127+
err := r.spokeDynamicClient.Resource(gvr).Namespace(staleManifest.Namespace).
128+
Delete(ctx, staleManifest.Name, metav1.DeleteOptions{})
129+
if err != nil && !apierrors.IsNotFound(err) {
130+
klog.ErrorS(err, "failed to delete the staled manifest", "manifest", staleManifest, "owner", owner)
131+
errs = append(errs, err)
132+
}
133+
} else {
134+
klog.V(2).InfoS("remove the owner reference from the staled manifest", "manifest", staleManifest, "owner", owner)
135+
uObj.SetOwnerReferences(newOwners)
136+
_, err := r.spokeDynamicClient.Resource(gvr).Namespace(staleManifest.Namespace).Update(ctx, uObj, metav1.UpdateOptions{FieldManager: workFieldManagerName})
137+
if err != nil {
138+
klog.ErrorS(err, "failed to remove the owner reference from manifest", "manifest", staleManifest, "owner", owner)
139+
errs = append(errs, err)
140+
}
141+
}
142+
}
143+
return utilerrors.NewAggregate(errs)
144+
}
+228
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controllers
18+
19+
import (
20+
"context"
21+
22+
. "github.com/onsi/ginkgo/v2"
23+
. "github.com/onsi/gomega"
24+
corev1 "k8s.io/api/core/v1"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/types"
29+
utilrand "k8s.io/apimachinery/pkg/util/rand"
30+
31+
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
32+
)
33+
34+
var _ = Describe("Work Status Reconciler", func() {
35+
var resourceNamespace string
36+
var workNamespace string
37+
var work *workv1alpha1.Work
38+
var cm, cm2 *corev1.ConfigMap
39+
40+
var wns corev1.Namespace
41+
var rns corev1.Namespace
42+
43+
BeforeEach(func() {
44+
workNamespace = "cluster-" + utilrand.String(5)
45+
resourceNamespace = utilrand.String(5)
46+
47+
wns = corev1.Namespace{
48+
ObjectMeta: metav1.ObjectMeta{
49+
Name: workNamespace,
50+
},
51+
}
52+
err := k8sClient.Create(context.Background(), &wns)
53+
Expect(err).ToNot(HaveOccurred())
54+
55+
rns = corev1.Namespace{
56+
ObjectMeta: metav1.ObjectMeta{
57+
Name: resourceNamespace,
58+
},
59+
}
60+
err = k8sClient.Create(context.Background(), &rns)
61+
Expect(err).ToNot(HaveOccurred())
62+
63+
// Create the Work object with some type of Manifest resource.
64+
cm = &corev1.ConfigMap{
65+
TypeMeta: metav1.TypeMeta{
66+
APIVersion: "v1",
67+
Kind: "ConfigMap",
68+
},
69+
ObjectMeta: metav1.ObjectMeta{
70+
Name: "configmap-" + utilrand.String(5),
71+
Namespace: resourceNamespace,
72+
},
73+
Data: map[string]string{
74+
"test": "test",
75+
},
76+
}
77+
cm2 = &corev1.ConfigMap{
78+
TypeMeta: metav1.TypeMeta{
79+
APIVersion: "v1",
80+
Kind: "ConfigMap",
81+
},
82+
ObjectMeta: metav1.ObjectMeta{
83+
Name: "configmap2-" + utilrand.String(5),
84+
Namespace: resourceNamespace,
85+
},
86+
Data: map[string]string{
87+
"test": "test",
88+
},
89+
}
90+
91+
By("Create work that contains two configMaps")
92+
work = &workv1alpha1.Work{
93+
ObjectMeta: metav1.ObjectMeta{
94+
Name: "work-" + utilrand.String(5),
95+
Namespace: workNamespace,
96+
},
97+
Spec: workv1alpha1.WorkSpec{
98+
Workload: workv1alpha1.WorkloadTemplate{
99+
Manifests: []workv1alpha1.Manifest{
100+
{
101+
RawExtension: runtime.RawExtension{Object: cm},
102+
},
103+
{
104+
RawExtension: runtime.RawExtension{Object: cm2},
105+
},
106+
},
107+
},
108+
},
109+
}
110+
})
111+
112+
AfterEach(func() {
113+
// TODO: Ensure that all resources are being deleted.
114+
Expect(k8sClient.Delete(context.Background(), work)).Should(Succeed())
115+
Expect(k8sClient.Delete(context.Background(), &wns)).Should(Succeed())
116+
Expect(k8sClient.Delete(context.Background(), &rns)).Should(Succeed())
117+
})
118+
119+
It("Should delete the manifest from the member cluster after it is removed from work", func() {
120+
By("Apply the work")
121+
Expect(k8sClient.Create(context.Background(), work)).ToNot(HaveOccurred())
122+
123+
By("Make sure that the work is applied")
124+
currentWork := waitForWorkToApply(work.Name, workNamespace)
125+
var appliedWork workv1alpha1.AppliedWork
126+
Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)).Should(Succeed())
127+
Expect(len(appliedWork.Status.AppliedResources)).Should(Equal(2))
128+
129+
By("Remove configMap 2 from the work")
130+
currentWork.Spec.Workload.Manifests = []workv1alpha1.Manifest{
131+
{
132+
RawExtension: runtime.RawExtension{Object: cm},
133+
},
134+
}
135+
Expect(k8sClient.Update(context.Background(), currentWork)).Should(Succeed())
136+
137+
By("Verify that the resource is removed from the cluster")
138+
Eventually(func() bool {
139+
var configMap corev1.ConfigMap
140+
return apierrors.IsNotFound(k8sClient.Get(context.Background(), types.NamespacedName{Name: cm2.Name, Namespace: resourceNamespace}, &configMap))
141+
}, timeout, interval).Should(BeTrue())
142+
143+
By("Verify that the appliedWork status is correct")
144+
Eventually(func() bool {
145+
Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)).Should(Succeed())
146+
return len(appliedWork.Status.AppliedResources) == 1
147+
}, timeout, interval).Should(BeTrue())
148+
Expect(appliedWork.Status.AppliedResources[0].Name).Should(Equal(cm.GetName()))
149+
Expect(appliedWork.Status.AppliedResources[0].Namespace).Should(Equal(cm.GetNamespace()))
150+
Expect(appliedWork.Status.AppliedResources[0].Version).Should(Equal(cm.GetObjectKind().GroupVersionKind().Version))
151+
Expect(appliedWork.Status.AppliedResources[0].Group).Should(Equal(cm.GetObjectKind().GroupVersionKind().Group))
152+
Expect(appliedWork.Status.AppliedResources[0].Kind).Should(Equal(cm.GetObjectKind().GroupVersionKind().Kind))
153+
})
154+
155+
It("Should delete the manifest from the member cluster after it is removed from work", func() {
156+
By("Create another work that contains configMap 2")
157+
work2 := work.DeepCopy()
158+
work2.Name = "work-" + utilrand.String(5)
159+
Expect(k8sClient.Create(context.Background(), work)).ToNot(HaveOccurred())
160+
Expect(k8sClient.Create(context.Background(), work2)).ToNot(HaveOccurred())
161+
162+
By("Make sure that the appliedWork is updated")
163+
var appliedWork, appliedWork2 workv1alpha1.AppliedWork
164+
Eventually(func() bool {
165+
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)
166+
if err != nil {
167+
return false
168+
}
169+
return len(appliedWork.Status.AppliedResources) == 2
170+
}, timeout, interval).Should(BeTrue())
171+
172+
By("Make sure that the appliedWork2 is updated")
173+
Eventually(func() bool {
174+
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: work2.Name}, &appliedWork2)
175+
if err != nil {
176+
return false
177+
}
178+
return len(appliedWork2.Status.AppliedResources) == 2
179+
}, timeout, interval).Should(BeTrue())
180+
181+
By("Remove configMap 2 from the work")
182+
currentWork := waitForWorkToApply(work.Name, workNamespace)
183+
currentWork.Spec.Workload.Manifests = []workv1alpha1.Manifest{
184+
{
185+
RawExtension: runtime.RawExtension{Object: cm},
186+
},
187+
}
188+
Expect(k8sClient.Update(context.Background(), currentWork)).Should(Succeed())
189+
currentWork = waitForWorkToApply(work.Name, workNamespace)
190+
Expect(len(currentWork.Status.ManifestConditions)).Should(Equal(1))
191+
192+
By("Verify that configMap 2 is removed from the appliedWork")
193+
Eventually(func() bool {
194+
Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work.Name}, &appliedWork)).Should(Succeed())
195+
return len(appliedWork.Status.AppliedResources) == 1
196+
}, timeout, interval).Should(BeTrue())
197+
198+
By("Verify that configMap 2 is not removed from the cluster")
199+
Consistently(func() bool {
200+
var configMap corev1.ConfigMap
201+
return k8sClient.Get(context.Background(), types.NamespacedName{Name: cm2.Name, Namespace: resourceNamespace}, &configMap) == nil
202+
}, timeout, interval).Should(BeTrue())
203+
204+
By("Remove configMap 2 from the work2")
205+
currentWork = waitForWorkToApply(work2.Name, workNamespace)
206+
currentWork.Spec.Workload.Manifests = []workv1alpha1.Manifest{
207+
{
208+
RawExtension: runtime.RawExtension{Object: cm},
209+
},
210+
}
211+
Expect(k8sClient.Update(context.Background(), currentWork)).Should(Succeed())
212+
currentWork = waitForWorkToApply(work.Name, workNamespace)
213+
Expect(len(currentWork.Status.ManifestConditions)).Should(Equal(1))
214+
215+
By("Verify that the resource is removed from the appliedWork")
216+
Eventually(func() bool {
217+
Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: work2.Name}, &appliedWork2)).Should(Succeed())
218+
return len(appliedWork2.Status.AppliedResources) == 1
219+
}, timeout, interval).Should(BeTrue())
220+
221+
By("Verify that the cm2 is removed from the cluster")
222+
Eventually(func() bool {
223+
var configMap corev1.ConfigMap
224+
return apierrors.IsNotFound(k8sClient.Get(context.Background(), types.NamespacedName{Name: cm2.Name, Namespace: resourceNamespace}, &configMap))
225+
}, timeout, interval).Should(BeTrue())
226+
})
227+
228+
})

0 commit comments

Comments
 (0)