Skip to content

Commit

Permalink
Merge pull request #117 from ryanzhang-oss/add-join-leave
Browse files Browse the repository at this point in the history
feat: add work agent join/leave
  • Loading branch information
Fei-Guo authored Sep 14, 2022
2 parents ba21e65 + b69a93e commit 9f2434d
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 44 deletions.
10 changes: 9 additions & 1 deletion cmd/workcontroller/workcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,18 @@ func main() {
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

if err := controllers.Start(ctrl.SetupSignalHandler(), hubConfig, ctrl.GetConfigOrDie(), setupLog, opts); err != nil {
ctx := ctrl.SetupSignalHandler()
hubMgr, _, err := controllers.CreateControllers(ctx, hubConfig, ctrl.GetConfigOrDie(), setupLog, opts)
if err != nil {
setupLog.Error(err, "problem running controllers")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

klog.Info("starting hub manager")
defer klog.Info("shutting down hub manager")
if err := hubMgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running hub manager")
}
}

func getKubeConfig(hubkubeconfig string) (*restclient.Config, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/controllers/applied_work_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Wo
}
}
if !resStillExist {
klog.V(5).InfoS("find an orphaned resource in the member cluster",
klog.V(2).InfoS("find an orphaned resource in the member cluster",
"parent resource", work.GetName(), "orphaned resource", resourceMeta.ResourceIdentifier)
staleRes = append(staleRes, resourceMeta)
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Wo
}
}
if !resRecorded {
klog.V(5).InfoS("discovered a new manifest resource",
klog.V(2).InfoS("discovered a new manifest resource",
"parent Work", work.GetName(), "manifest", manifestCond.Identifier)
obj, err := r.spokeDynamicClient.Resource(schema.GroupVersionResource{
Group: manifestCond.Identifier.Group,
Expand All @@ -86,7 +86,7 @@ func (r *ApplyWorkReconciler) generateDiff(ctx context.Context, work *workapi.Wo
}).Namespace(manifestCond.Identifier.Namespace).Get(ctx, manifestCond.Identifier.Name, metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
klog.V(4).InfoS("the new manifest resource is already deleted", "parent Work", work.GetName(), "manifest", manifestCond.Identifier)
klog.V(2).InfoS("the new manifest resource is already deleted", "parent Work", work.GetName(), "manifest", manifestCond.Identifier)
continue
case err != nil:
klog.ErrorS(err, "failed to retrieve the manifest", "parent Work", work.GetName(), "manifest", manifestCond.Identifier)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (r *ApplyWorkReconciler) deleteStaleManifest(ctx context.Context, staleMani
}
}
if !found {
klog.V(4).InfoS("the stale manifest is not owned by this work, skip", "manifest", staleManifest, "owner", owner)
klog.V(2).InfoS("the stale manifest is not owned by this work, skip", "manifest", staleManifest, "owner", owner)
continue
}
if len(newOwners) == 0 {
Expand Down
65 changes: 53 additions & 12 deletions pkg/controllers/apply_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ import (

const (
workFieldManagerName = "work-api-agent"

AnnotationLastAppliedManifest = "fleet.azure.com/last-applied-manifest"
)

// ApplyWorkReconciler reconciles a Work object
Expand All @@ -59,18 +57,20 @@ type ApplyWorkReconciler struct {
restMapper meta.RESTMapper
recorder record.EventRecorder
concurrency int
workNameSpace string
joined bool
}

func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.Interface, spokeClient client.Client, restMapper meta.RESTMapper, recorder record.EventRecorder, concurrency int, joined bool) *ApplyWorkReconciler {
func NewApplyWorkReconciler(hubClient client.Client, spokeDynamicClient dynamic.Interface, spokeClient client.Client,
restMapper meta.RESTMapper, recorder record.EventRecorder, concurrency int, workNameSpace string) *ApplyWorkReconciler {
return &ApplyWorkReconciler{
client: hubClient,
spokeDynamicClient: spokeDynamicClient,
spokeClient: spokeClient,
restMapper: restMapper,
recorder: recorder,
concurrency: concurrency,
joined: joined,
workNameSpace: workNameSpace,
}
}

Expand All @@ -85,7 +85,7 @@ type applyResult struct {
// Reconcile implement the control loop logic for Work object.
func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if !r.joined {
klog.V(3).InfoS("work controller is not started yet, requeue the request", "work", req.NamespacedName)
klog.V(2).InfoS("work controller is not started yet, requeue the request", "work", req.NamespacedName)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
klog.InfoS("work apply controller reconcile loop triggered.", "work", req.NamespacedName)
Expand All @@ -95,7 +95,7 @@ func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
err := r.client.Get(ctx, req.NamespacedName, work)
switch {
case apierrors.IsNotFound(err):
klog.V(4).InfoS("the work resource is deleted", "work", req.NamespacedName)
klog.V(2).InfoS("the work resource is deleted", "work", req.NamespacedName)
return ctrl.Result{}, nil
case err != nil:
klog.ErrorS(err, "failed to retrieve the work", "work", req.NamespacedName)
Expand Down Expand Up @@ -150,7 +150,7 @@ func (r *ApplyWorkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// we can't proceed to update the applied
return ctrl.Result{}, err
} else if len(staleRes) > 0 {
klog.V(3).InfoS("successfully garbage-collected all stale manifests", work.Kind, kLogObjRef, "number of GCed res", len(staleRes))
klog.V(2).InfoS("successfully garbage-collected all stale manifests", work.Kind, kLogObjRef, "number of GCed res", len(staleRes))
for _, res := range staleRes {
klog.V(5).InfoS("successfully garbage-collected a stale manifest", work.Kind, kLogObjRef, "res", res)
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (r *ApplyWorkReconciler) garbageCollectAppliedWork(ctx context.Context, wor
err := r.spokeClient.Delete(ctx, &appliedWork, &client.DeleteOptions{PropagationPolicy: &deletePolicy})
switch {
case apierrors.IsNotFound(err):
klog.V(4).InfoS("the appliedWork is already deleted", "appliedWork", work.Name)
klog.V(2).InfoS("the appliedWork is already deleted", "appliedWork", work.Name)
case err != nil:
klog.ErrorS(err, "failed to delete the appliedWork", "appliedWork", work.Name)
return ctrl.Result{}, err
Expand Down Expand Up @@ -273,9 +273,9 @@ func (r *ApplyWorkReconciler) applyManifests(ctx context.Context, manifests []wo
if result.err == nil {
result.generation = appliedObj.GetGeneration()
if result.updated {
klog.V(4).InfoS("manifest upsert succeeded", "gvr", gvr, "manifest", kLogObjRef, "new ObservedGeneration", result.generation)
klog.V(2).InfoS("manifest upsert succeeded", "gvr", gvr, "manifest", kLogObjRef, "new ObservedGeneration", result.generation)
} else {
klog.V(5).InfoS("manifest upsert unwarranted", "gvr", gvr, "manifest", kLogObjRef)
klog.V(2).InfoS("manifest upsert unwarranted", "gvr", gvr, "manifest", kLogObjRef)
}
} else {
klog.ErrorS(result.err, "manifest upsert failed", "gvr", gvr, "manifest", kLogObjRef)
Expand Down Expand Up @@ -322,7 +322,7 @@ func (r *ApplyWorkReconciler) applyUnstructured(ctx context.Context, gvr schema.
actual, err := r.spokeDynamicClient.Resource(gvr).Namespace(manifestObj.GetNamespace()).Create(
ctx, manifestObj, metav1.CreateOptions{FieldManager: workFieldManagerName})
if err == nil {
klog.V(4).InfoS("successfully created the manifest", "gvr", gvr, "manifest", manifestRef)
klog.V(2).InfoS("successfully created the manifest", "gvr", gvr, "manifest", manifestRef)
return actual, true, nil
}
return nil, false, err
Expand Down Expand Up @@ -394,7 +394,7 @@ func (r *ApplyWorkReconciler) patchCurrentResource(ctx context.Context, gvr sche
klog.ErrorS(patchErr, "failed to patch the manifest", "gvr", gvr, "manifest", manifestRef)
return nil, false, patchErr
}
klog.V(3).InfoS("manifest patch succeeded", "gvr", gvr, "manifest", manifestRef)
klog.V(2).InfoS("manifest patch succeeded", "gvr", gvr, "manifest", manifestRef)
return manifestObj, true, nil
}

Expand Down Expand Up @@ -426,6 +426,47 @@ func (r *ApplyWorkReconciler) generateWorkCondition(results []applyResult, work
return errs
}

// Join starts to reconcile
func (r *ApplyWorkReconciler) Join(ctx context.Context) error {
if !r.joined {
klog.InfoS("mark the apply work reconciler joined")
}
r.joined = true
return nil
}

// Leave start
func (r *ApplyWorkReconciler) Leave(ctx context.Context) error {
var works workv1alpha1.WorkList
if r.joined {
klog.InfoS("mark the apply work reconciler left")
}
r.joined = false
// list all the work object we created in the member cluster namespace
listOpts := []client.ListOption{
client.InNamespace(r.workNameSpace),
}
if err := r.client.List(ctx, &works, listOpts...); err != nil {
klog.ErrorS(err, "failed to list all the work object", "clusterNS", r.workNameSpace)
return client.IgnoreNotFound(err)
}
// we leave the resources on the member cluster for now
for _, work := range works.Items {
staleWork := work.DeepCopy()
if controllerutil.ContainsFinalizer(staleWork, workFinalizer) {
controllerutil.RemoveFinalizer(staleWork, workFinalizer)
if updateErr := r.client.Update(ctx, staleWork, &client.UpdateOptions{}); updateErr != nil {
klog.ErrorS(updateErr, "failed to remove the work finalizer from the work",
"clusterNS", r.workNameSpace, "work", klog.KObj(staleWork))
return updateErr
}
}
}
klog.V(2).InfoS("successfully removed all the work finalizers in the cluster namespace",
"clusterNS", r.workNameSpace, "number of work", len(works.Items))
return nil
}

// SetupWithManager wires up the controller.
func (r *ApplyWorkReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
17 changes: 16 additions & 1 deletion pkg/controllers/apply_controller_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
)

// createWorkWithManifest creates a work given a manifest
func createWorkWithManifest(workNamespace string, manifest runtime.Object) *workv1alpha1.Work {
manifestCopy := manifest.DeepCopyObject()
newWork := workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: "work-" + utilrand.String(5),
Expand All @@ -27,7 +29,7 @@ func createWorkWithManifest(workNamespace string, manifest runtime.Object) *work
Workload: workv1alpha1.WorkloadTemplate{
Manifests: []workv1alpha1.Manifest{
{
RawExtension: runtime.RawExtension{Object: manifest},
RawExtension: runtime.RawExtension{Object: manifestCopy},
},
},
},
Expand Down Expand Up @@ -78,3 +80,16 @@ func waitForWorkToApply(workName, workNS string) *workv1alpha1.Work {
}, timeout, interval).Should(BeTrue())
return &resultWork
}

// waitForWorkToBeHandled waits for a work to have a finalizer
func waitForWorkToBeHandled(workName, workNS string) *workv1alpha1.Work {
var resultWork workv1alpha1.Work
Eventually(func() bool {
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: workName, Namespace: workNS}, &resultWork)
if err != nil {
return false
}
return controllerutil.ContainsFinalizer(&resultWork, workFinalizer)
}, timeout, interval).Should(BeTrue())
return &resultWork
}
Loading

0 comments on commit 9f2434d

Please sign in to comment.