Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ migration's migrating status #1474

Merged
merged 16 commits into from
Mar 25, 2025
2 changes: 1 addition & 1 deletion agent/pkg/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func AddToManager(context context.Context, mgr ctrl.Manager, transportClient tra
dispatcher.RegisterSyncer(constants.CloudEventTypeMigrationFrom,
syncers.NewManagedClusterMigrationFromSyncer(mgr.GetClient(), transportClient))
dispatcher.RegisterSyncer(constants.CloudEventTypeMigrationTo,
syncers.NewManagedClusterMigrationToSyncer(mgr.GetClient()))
syncers.NewManagedClusterMigrationToSyncer(mgr.GetClient(), transportClient))
dispatcher.RegisterSyncer(constants.ResyncMsgKey, syncers.NewResyncer())

log.Info("added the spec controllers to manager")
Expand Down
221 changes: 158 additions & 63 deletions agent/pkg/spec/syncers/migration_from_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ package syncers
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/go-kratos/kratos/v2/log"
klusterletv1alpha1 "github.com/stolostron/cluster-lifecycle-api/klusterletconfig/v1alpha1"
addonv1 "github.com/stolostron/klusterlet-addon-controller/pkg/apis/agent/v1"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clusterv1 "open-cluster-management.io/api/cluster/v1"
operatorv1 "open-cluster-management.io/api/operator/v1"
Expand All @@ -28,6 +30,12 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/enum"
"github.com/stolostron/multicluster-global-hub/pkg/logger"
"github.com/stolostron/multicluster-global-hub/pkg/transport"
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)

const (
klusterletConfigNamePrefix = "migration-"
bootstrapSecretNamePrefix = "bootstrap-"
)

// This is a temporary solution to wait for applying the klusterletconfig
Expand All @@ -44,7 +52,7 @@ func NewManagedClusterMigrationFromSyncer(client client.Client,
transportClient transport.TransportClient,
) *managedClusterMigrationFromSyncer {
return &managedClusterMigrationFromSyncer{
log: logger.ZapLogger("managed-cluster-migration-from-syncer"),
log: logger.DefaultZapLogger(),
client: client,
transportClient: transportClient,
bundleVersion: eventversion.NewVersion(),
Expand All @@ -59,8 +67,8 @@ func (s *managedClusterMigrationFromSyncer) Sync(ctx context.Context, payload []
}
s.log.Debugf("received managed cluster migration event %s", string(payload))

// send KlusterletAddonConfig to the global hub and then propogate to the target cluster
if managedClusterMigrationEvent.Stage == migrationv1alpha1.PhaseInitializing {
s.log.Infof("initializing managed cluster migration event")
managedClusters := managedClusterMigrationEvent.ManagedClusters
toHub := managedClusterMigrationEvent.ToHub
for _, managedCluster := range managedClusters {
Expand All @@ -71,59 +79,148 @@ func (s *managedClusterMigrationFromSyncer) Sync(ctx context.Context, payload []
return nil
}

// TODO: deprecated in the migrating stage
// create or update bootstrap secret
bootstrapSecret := managedClusterMigrationEvent.BootstrapSecret
if managedClusterMigrationEvent.Stage == migrationv1alpha1.PhaseMigrating {
s.log.Infof("registering managed cluster migration event")
if err := s.registering(ctx, managedClusterMigrationEvent); err != nil {
return err
}
return nil
}

if managedClusterMigrationEvent.Stage == migrationv1alpha1.PhaseCompleted {
s.log.Infof("completed managed cluster migration event")
if err := s.cleanup(ctx, managedClusterMigrationEvent); err != nil {
return err
}
return nil
}
return nil
}

func (m *managedClusterMigrationFromSyncer) cleanup(
ctx context.Context, migratingEvt *bundleevent.ManagedClusterMigrationFromEvent,
) error {
bootstrapSecret := migratingEvt.BootstrapSecret
// ensure bootstrap kubeconfig secret
foundBootstrapSecret := &corev1.Secret{}
if err := s.client.Get(ctx,
if err := m.client.Get(ctx,
types.NamespacedName{
Name: bootstrapSecret.Name,
Namespace: bootstrapSecret.Namespace,
}, foundBootstrapSecret); err != nil {
if apierrors.IsNotFound(err) {
s.log.Infof("creating bootstrap secret %s", bootstrapSecret.GetName())
if err := s.client.Create(ctx, bootstrapSecret); err != nil {
m.log.Infof("bootstrap secret %s is removed", bootstrapSecret.GetName())
} else {
return err
}
} else {
m.log.Infof("delete bootstrap secret %s", bootstrapSecret.GetName())
if err := m.client.Delete(ctx, bootstrapSecret); err != nil {
return err
}
}

// ensure klusterletconfig
klusterletConfig := &klusterletv1alpha1.KlusterletConfig{
ObjectMeta: metav1.ObjectMeta{
Name: klusterletConfigNamePrefix + migratingEvt.ToHub,
},
}
if err := m.client.Get(ctx, client.ObjectKeyFromObject(klusterletConfig), klusterletConfig); err != nil {
if apierrors.IsNotFound(err) {
m.log.Infof("klusterletConfig %s is removed", klusterletConfig.GetName())
} else {
return err
}
} else {
m.log.Infof("delete klusterletconfig secret %s", klusterletConfig.GetName())
if err := m.client.Delete(ctx, klusterletConfig); err != nil {
return err
}
}

m.log.Infof("detach clusters %v", migratingEvt.ManagedClusters)
if err := m.detachManagedClusters(ctx, migratingEvt.ManagedClusters); err != nil {
m.log.Errorf("failed to detach managed clusters: %v", err)
return err
}
return nil
}

func (m *managedClusterMigrationFromSyncer) registering(
ctx context.Context, migratingEvt *bundleevent.ManagedClusterMigrationFromEvent,
) error {
bootstrapSecret := migratingEvt.BootstrapSecret
// ensure bootstrap kubeconfig secret
foundBootstrapSecret := &corev1.Secret{}
if err := m.client.Get(ctx,
types.NamespacedName{
Name: bootstrapSecret.Name,
Namespace: bootstrapSecret.Namespace,
}, foundBootstrapSecret); err != nil {
if apierrors.IsNotFound(err) {
m.log.Infof("creating bootstrap secret %s", bootstrapSecret.GetName())
if err := m.client.Create(ctx, bootstrapSecret); err != nil {
return err
}
} else {
return err
}
} else {
// update the bootstrap secret if it already exists
s.log.Infof("updating bootstrap secret %s", bootstrapSecret.GetName())
if err := s.client.Update(ctx, bootstrapSecret); err != nil {
m.log.Infof("updating bootstrap secret %s", bootstrapSecret.GetName())
if err := m.client.Update(ctx, bootstrapSecret); err != nil {
return err
}
}

// create klusterlet config if it does not exist
klusterletConfig := managedClusterMigrationEvent.KlusterletConfig
// set the bootstrap kubeconfig secrets in klusterlet config
klusterletConfig.Spec.BootstrapKubeConfigs.LocalSecrets.KubeConfigSecrets = []operatorv1.KubeConfigSecret{
{
Name: bootstrapSecret.Name,
// ensure klusterletconfig
klusterletConfig := &klusterletv1alpha1.KlusterletConfig{
ObjectMeta: metav1.ObjectMeta{
Name: klusterletConfigNamePrefix + migratingEvt.ToHub,
},
Spec: klusterletv1alpha1.KlusterletConfigSpec{
BootstrapKubeConfigs: operatorv1.BootstrapKubeConfigs{
Type: operatorv1.LocalSecrets,
LocalSecrets: operatorv1.LocalSecretsConfig{
KubeConfigSecrets: []operatorv1.KubeConfigSecret{
{
Name: bootstrapSecret.Name,
},
},
},
},
},
}
foundKlusterletConfig := &klusterletv1alpha1.KlusterletConfig{}
if err := s.client.Get(ctx,
types.NamespacedName{
Name: klusterletConfig.Name,
}, foundKlusterletConfig); err != nil {
if err := m.client.Get(ctx, client.ObjectKeyFromObject(klusterletConfig), klusterletConfig); err != nil {
if apierrors.IsNotFound(err) {
s.log.Infof("creating klusterlet config %s", klusterletConfig.GetName())
s.log.Debugf("creating klusterlet config %v", klusterletConfig)
if err := s.client.Create(ctx, klusterletConfig); err != nil {
if err := m.client.Create(ctx, klusterletConfig); err != nil {
return err
}
} else {
return err
}
}
managedClusters := managedClusterMigrationEvent.ManagedClusters
// update managed cluster annotations to point to the new klusterlet config
containBootstrapSecret := false
kubeConfigSecrets := klusterletConfig.Spec.BootstrapKubeConfigs.LocalSecrets.KubeConfigSecrets
for _, kubeConfigSecret := range kubeConfigSecrets {
if kubeConfigSecret.Name == bootstrapSecret.Name {
containBootstrapSecret = true
}
}
if !containBootstrapSecret {
klusterletConfig.Spec.BootstrapKubeConfigs.LocalSecrets.KubeConfigSecrets = append(kubeConfigSecrets,
operatorv1.KubeConfigSecret{Name: bootstrapSecret.Name})
if err := m.client.Update(ctx, klusterletConfig); err != nil {
return err
}
}

// update managed cluster annotations to point to the new klusterletconfig
managedClusters := migratingEvt.ManagedClusters
for _, managedCluster := range managedClusters {
mc := &clusterv1.ManagedCluster{}
if err := s.client.Get(ctx, types.NamespacedName{
if err := m.client.Get(ctx, types.NamespacedName{
Name: managedCluster,
}, mc); err != nil {
return err
Expand All @@ -140,38 +237,28 @@ func (s *managedClusterMigrationFromSyncer) Sync(ctx context.Context, payload []
annotations["agent.open-cluster-management.io/klusterlet-config"] = klusterletConfig.Name
annotations[constants.ManagedClusterMigrating] = ""
mc.SetAnnotations(annotations)
if err := s.client.Update(ctx, mc); err != nil {
return err
}
}
// send KlusterletAddonConfig to the global hub and then propogate to the target cluster
for _, managedCluster := range managedClusters {
if err := s.sendKlusterletAddonConfig(ctx, managedCluster, ""); err != nil {
if err := m.client.Update(ctx, mc); err != nil {
return err
}
}

// wait for 10 seconds to ensure the klusterletconfig is applied and then trigger the migration
// right now, no condition indicates the klusterletconfig is applied
// ensure the bootstrap secret is propagated into the managed cluster
time.Sleep(sleepForApplying)

// set the hub accept client into false to trigger the re-registering
for _, managedCluster := range managedClusters {
mc := &clusterv1.ManagedCluster{}
if err := s.client.Get(ctx, types.NamespacedName{
if err := m.client.Get(ctx, types.NamespacedName{
Name: managedCluster,
}, mc); err != nil {
return err
}
mc.Spec.HubAcceptsClient = false
s.log.Infof("updating managedcluster %s to set HubAcceptsClient as false", mc.Name)
if err := s.client.Update(ctx, mc); err != nil {
m.log.Infof("updating managedcluster %s to set HubAcceptsClient as false", mc.Name)
if err := m.client.Update(ctx, mc); err != nil {
return err
}
}
time.Sleep(sleepForApplying)
if err := s.detachManagedClusters(ctx, managedClusters); err != nil {
s.log.Error(err, "failed to detach managed clusters")
return err
}

return nil
}
Expand All @@ -180,16 +267,15 @@ func (s *managedClusterMigrationFromSyncer) Sync(ctx context.Context, payload []
func (s *managedClusterMigrationFromSyncer) sendKlusterletAddonConfig(ctx context.Context,
managedCluster string, toHub string,
) error {
config := &addonv1.KlusterletAddonConfig{}
config := &addonv1.KlusterletAddonConfig{
ObjectMeta: metav1.ObjectMeta{Name: managedCluster, Namespace: managedCluster},
}
// send klusterletAddonConfig to global hub so that it can be transferred to the target cluster
if err := s.client.Get(ctx, types.NamespacedName{
Name: managedCluster,
Namespace: managedCluster,
}, config); err != nil {
if err := s.client.Get(ctx, client.ObjectKeyFromObject(config), config); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
return nil
log.Infof("klusterletAddonConfig %s doesn't exist", managedCluster)
}
// do cleanup
config.SetManagedFields(nil)
Expand All @@ -205,21 +291,30 @@ func (s *managedClusterMigrationFromSyncer) sendKlusterletAddonConfig(ctx contex
return fmt.Errorf("failed to marshal klusterletAddonConfig (%v) - %w", config, err)
}

s.bundleVersion.Incr()
e := cloudevents.NewEvent()
e.SetType(string(enum.KlusterletAddonConfigType))
return SendEvent(ctx, s.transportClient, string(enum.KlusterletAddonConfigType), configs.GetLeafHubName(),
toHub, payloadBytes, s.bundleVersion)
}

e.SetSource(configs.GetLeafHubName())
e.SetExtension(constants.CloudEventExtensionKeyClusterName, toHub)
e.SetExtension(eventversion.ExtVersion, s.bundleVersion.String())
_ = e.SetData(cloudevents.ApplicationJSON, payloadBytes)
if s.transportClient != nil {
if err := s.transportClient.GetProducer().SendEvent(ctx, e); err != nil {
return fmt.Errorf("failed to send klusterletAddonConfig back to the global hub, due to %v", err)
func SendEvent(
ctx context.Context,
transportClient transport.TransportClient,
eventType string,
source string,
clusterName string,
payloadBytes []byte,
version *eventversion.Version,
) error {
version.Incr()
e := utils.ToCloudEvent(eventType, source, clusterName, payloadBytes)
e.SetExtension(eventversion.ExtVersion, version.String())
if transportClient != nil {
if err := transportClient.GetProducer().SendEvent(ctx, e); err != nil {
return fmt.Errorf("failed to send event(%s) from %s to %s: %v", eventType, source, clusterName, err)
}
s.bundleVersion.Next()
version.Next()
return nil
}
return nil
return errors.New("transport client must not be nil")
}

func (s *managedClusterMigrationFromSyncer) detachManagedClusters(ctx context.Context, managedClusters []string) error {
Expand Down
Loading