Skip to content

Commit 30d43f7

Browse files
authored
fix(controller): delete all pods when removing workload and improve … (#159)
* feat(controller): delete all pods when removing workload and improve finalizer logic * fix lint * fix: workload may not exist, should check it in cleanup.
1 parent 3851c7c commit 30d43f7

File tree

3 files changed

+59
-4
lines changed

3 files changed

+59
-4
lines changed

internal/controller/tensorfusionworkload_controller.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/NexusGPU/tensor-fusion/internal/utils"
4242
"github.com/NexusGPU/tensor-fusion/internal/worker"
4343
"github.com/prometheus/client_golang/prometheus"
44+
"github.com/samber/lo"
4445
)
4546

4647
// TensorFusionWorkloadReconciler reconciles a TensorFusionWorkload object
@@ -80,6 +81,17 @@ func (r *TensorFusionWorkloadReconciler) Reconcile(ctx context.Context, req ctrl
8081
}
8182

8283
shouldReturn, err := utils.HandleFinalizer(ctx, workload, r.Client, func(ctx context.Context, _ *tfv1.TensorFusionWorkload) (bool, error) {
84+
// delete all pods
85+
existsPods := lo.Filter(podList.Items, func(pod corev1.Pod, _ int) bool {
86+
return pod.DeletionTimestamp == nil
87+
})
88+
if len(existsPods) > 0 {
89+
if err := r.DeleteAllOf(ctx, &corev1.Pod{},
90+
client.InNamespace(req.Namespace),
91+
client.MatchingLabels{constants.WorkloadKey: workload.Name}); err != nil {
92+
return false, fmt.Errorf("delete pods: %w", err)
93+
}
94+
}
8395
// check if all pods are deleted
8496
return len(podList.Items) == 0, nil
8597
})

internal/controller/tensorfusionworkload_controller_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
. "github.com/onsi/gomega"
2525
"github.com/samber/lo"
2626
corev1 "k8s.io/api/core/v1"
27+
"k8s.io/apimachinery/pkg/api/errors"
2728
"k8s.io/apimachinery/pkg/api/resource"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -184,6 +185,44 @@ var _ = Describe("TensorFusionWorkload Controller", func() {
184185
})
185186
})
186187

188+
Context("When deleting workload directly", func() {
189+
It("Should delete all pods and the workload itself", func() {
190+
pool := tfEnv.GetGPUPool(0)
191+
192+
workload := createTensorFusionWorkload(pool.Name, key, 2)
193+
checkWorkerPodCount(workload)
194+
checkWorkloadStatus(workload)
195+
196+
// wait for 2 pods to be created
197+
Eventually(func(g Gomega) {
198+
podList := &corev1.PodList{}
199+
g.Expect(k8sClient.List(ctx, podList,
200+
client.InNamespace(key.Namespace),
201+
client.MatchingLabels{constants.WorkloadKey: key.Name})).To(Succeed())
202+
g.Expect(podList.Items).To(HaveLen(2))
203+
}, timeout, interval).Should(Succeed())
204+
205+
// delete workload
206+
Expect(k8sClient.Delete(ctx, workload)).To(Succeed())
207+
208+
// wait for all pods to be deleted
209+
Eventually(func(g Gomega) {
210+
podList := &corev1.PodList{}
211+
g.Expect(k8sClient.List(ctx, podList,
212+
client.InNamespace(key.Namespace),
213+
client.MatchingLabels{constants.WorkloadKey: key.Name})).To(Succeed())
214+
g.Expect(podList.Items).Should(BeEmpty())
215+
}, timeout, interval).Should(Succeed())
216+
217+
// wait for workload itself to be deleted
218+
Eventually(func(g Gomega) {
219+
w := &tfv1.TensorFusionWorkload{}
220+
err := k8sClient.Get(ctx, key, w)
221+
g.Expect(err).To(HaveOccurred())
222+
}, timeout, interval).Should(Succeed())
223+
})
224+
})
225+
187226
Context("When GPUPool doesn't exist", func() {
188227
It("Should not create worker pod when reconciling a workload with non-existent pool", func() {
189228
workload := createTensorFusionWorkload("non-existent-pool", key, 1)
@@ -262,6 +301,13 @@ func cleanupWorkload(key client.ObjectKey) {
262301
GinkgoHelper()
263302
workload := &tfv1.TensorFusionWorkload{}
264303

304+
if err := k8sClient.Get(ctx, key, workload); err != nil {
305+
if errors.IsNotFound(err) {
306+
return
307+
}
308+
Expect(err).To(HaveOccurred())
309+
}
310+
265311
// Set replicas to 0
266312
Eventually(func(g Gomega) {
267313
g.Expect(k8sClient.Get(ctx, key, workload)).Should(Succeed())

internal/utils/reconcile.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,8 @@ func HandleFinalizer[T client.Object](
6060
shouldReturn = true
6161
return shouldReturn, err
6262
}
63-
// Cleanup not ready, wait for next reconcile
64-
shouldReturn = true
65-
return shouldReturn, err
6663
}
67-
// Finalizer already removed, continue with deletion
64+
// continue with deletion logic
6865
shouldReturn = false
6966
return shouldReturn, err
7067
}

0 commit comments

Comments
 (0)