diff --git a/controllers/sync.go b/controllers/sync.go index b9ebfa806..c96a0fe11 100644 --- a/controllers/sync.go +++ b/controllers/sync.go @@ -15,6 +15,7 @@ import ( apiProxy "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/apiproxy" ) +// canUpdateComponent checks if a component matches any of the given selectors. func canUpdateComponent(selectors []ytv1.ComponentUpdateSelector, component ytv1.Component) bool { for _, selector := range selectors { if selector.Class != consts.ComponentClassUnspecified { @@ -40,7 +41,7 @@ func canUpdateComponent(selectors []ytv1.ComponentUpdateSelector, component ytv1 // Considers splits all the components in two groups: ones that can be updated and ones which update isblocked. func chooseUpdatingComponents(spec ytv1.YtsaurusSpec, needUpdate []ytv1.Component, allComponents []ytv1.Component) (canUpdate []ytv1.Component, cannotUpdate []ytv1.Component) { - configuredSelectors := getEffectiveSelectors(spec) + configuredSelectors := getEffectiveUpdateSelectors(spec) for _, component := range needUpdate { upd := canUpdateComponent(configuredSelectors, component) @@ -82,7 +83,8 @@ func needFullUpdate(needUpdate []ytv1.Component) bool { return false } -func getEffectiveSelectors(spec ytv1.YtsaurusSpec) []ytv1.ComponentUpdateSelector { +// getEffectiveUpdateSelectors returns the effective update selectors based on the spec configuration. +func getEffectiveUpdateSelectors(spec ytv1.YtsaurusSpec) []ytv1.ComponentUpdateSelector { if spec.UpdatePlan != nil { return spec.UpdatePlan } diff --git a/controllers/update_flow_steps.go b/controllers/update_flow_steps.go index 7bcc0d403..3c8d5271f 100644 --- a/controllers/update_flow_steps.go +++ b/controllers/update_flow_steps.go @@ -109,6 +109,12 @@ func newFlowTree(head *flowStep) *flowTree { func (f *flowTree) execute(ctx context.Context, ytsaurus *apiProxy.Ytsaurus, componentManager *ComponentManager) (bool, error) { var err error currentState := ytsaurus.GetUpdateState() + // If UpdateState is empty (initial state), default to UpdateStateNone. + // This happens when an update is triggered for the first time before SaveUpdateState() is called. + // Otherwise, we would get a nil pointer dereference when looking up f.index[""]. + if currentState == "" { + currentState = ytv1.UpdateStateNone + } currentStep := f.index[currentState] // will execute one step at a time diff --git a/pkg/components/exec_node.go b/pkg/components/exec_node.go index e3510a8ca..dc4c243e0 100644 --- a/pkg/components/exec_node.go +++ b/pkg/components/exec_node.go @@ -90,7 +90,8 @@ func NewExecNode( func (n *ExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if ytv1.IsReadyToUpdateClusterState(n.ytsaurus.GetClusterState()) && (n.server.needUpdate() || n.sidecarConfigNeedsReload()) { + // Call needUpdate on self instead of n.server.needUpdate + if ytv1.IsReadyToUpdateClusterState(n.ytsaurus.GetClusterState()) && (n.needUpdate() || n.sidecarConfigNeedsReload()) { return SimpleStatus(SyncStatusNeedLocalUpdate), err } diff --git a/pkg/components/exec_node_base.go b/pkg/components/exec_node_base.go index 2f9d29610..df0f279e6 100644 --- a/pkg/components/exec_node_base.go +++ b/pkg/components/exec_node_base.go @@ -24,10 +24,70 @@ type baseExecNode struct { sidecarConfig *ConfigMapBuilder } +// addResourceList adds resources from newList to list, summing quantities for existing keys +func addResourceList(list, newList corev1.ResourceList) { + if newList == nil { + return + } + for name, quantity := range newList { + if value, ok := list[name]; ok { + value.Add(quantity) + list[name] = value + } else { + list[name] = quantity.DeepCopy() + } + } +} + +// addJobResourcesToResources adds JobResources to the given ResourceRequirements if conditions are met +func (n *baseExecNode) addJobResourcesToResources(resources *corev1.ResourceRequirements) { + if n.spec.JobResources == nil || n.criConfig.Isolated { + return + } + + // Ensure resource lists are initialized + if resources.Requests == nil { + resources.Requests = corev1.ResourceList{} + } + if resources.Limits == nil { + resources.Limits = corev1.ResourceList{} + } + + addResourceList(resources.Requests, n.spec.JobResources.Requests) + addResourceList(resources.Limits, n.spec.JobResources.Limits) +} + func (n *baseExecNode) Fetch(ctx context.Context) error { return resources.Fetch(ctx, n.server, n.sidecarConfig) } +// needUpdate overrides the server's needUpdate to use ExecNode-specific resource comparison +func (n *baseExecNode) needUpdate() bool { + return n.server.needUpdateWithSpecCheck(n.needStatefulSetSpecUpdate) +} + +// needStatefulSetSpecUpdate checks if resources have changed for ExecNode explicitly +// ExecNode adds JobResources to NodeResources, so we need to compute the expected total +func (n *baseExecNode) needStatefulSetSpecUpdate() bool { + oldSts := n.server.getStatefulSet().OldObject() + + if oldSts.GetResourceVersion() == "" { + return false + } + + if len(oldSts.Spec.Template.Spec.Containers) == 0 { + return false + } + oldContainer := oldSts.Spec.Template.Spec.Containers[0] + + // Compute expected resources (NodeResources + JobResources if not isolated) + expectedResources := n.spec.InstanceSpec.Resources.DeepCopy() + n.addJobResourcesToResources(expectedResources) + + // Compare expected with actual + return !resources.ResourceRequirementsEqual(oldContainer.Resources, *expectedResources) +} + func (n *baseExecNode) doBuildBase() error { statefulSet := n.server.buildStatefulSet() podSpec := &statefulSet.Spec.Template.Spec @@ -61,24 +121,8 @@ func (n *baseExecNode) doBuildBase() error { setContainerPrivileged(&podSpec.Containers[i]) } - if n.spec.JobResources != nil && !n.criConfig.Isolated { - // Pour job resources into node container if jobs are not isolated. - container := &podSpec.Containers[0] - - addResourceList := func(list, newList corev1.ResourceList) { - for name, quantity := range newList { - if value, ok := list[name]; ok { - value.Add(quantity) - list[name] = value - } else { - list[name] = quantity.DeepCopy() - } - } - } - - addResourceList(container.Resources.Requests, n.spec.JobResources.Requests) - addResourceList(container.Resources.Limits, n.spec.JobResources.Limits) - } + // Pour job resources into node container if jobs are not isolated. + n.addJobResourcesToResources(&podSpec.Containers[0].Resources) if n.criConfig.Service != ytv1.CRIServiceNone { if n.criConfig.Isolated { diff --git a/pkg/components/exec_node_remote.go b/pkg/components/exec_node_remote.go index e9e19e73f..bb8885e0a 100644 --- a/pkg/components/exec_node_remote.go +++ b/pkg/components/exec_node_remote.go @@ -78,7 +78,8 @@ func NewRemoteExecNodes( func (n *RemoteExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if n.server.needSync() || n.server.needUpdate() || n.sidecarConfigNeedsReload() { + // Call needUpdate on self to get ExecNode-specific resource comparison + if n.server.needSync() || n.needUpdate() || n.sidecarConfigNeedsReload() { return n.doSyncBase(ctx, dry) } diff --git a/pkg/components/httpproxy.go b/pkg/components/httpproxy.go index a8239723a..4b1e443ec 100644 --- a/pkg/components/httpproxy.go +++ b/pkg/components/httpproxy.go @@ -125,10 +125,32 @@ func (hp *HttpProxy) Fetch(ctx context.Context) error { ) } +// needUpdate overrides the server's needUpdate to account for HTTPS secret volumes +// that are added after StatefulSet build +func (hp *HttpProxy) needUpdate() bool { + return hp.server.needUpdateWithSpecCheck(hp.needStatefulSetSpecUpdate) +} + +// needStatefulSetSpecUpdate checks if the StatefulSet spec has changed +func (hp *HttpProxy) needStatefulSetSpecUpdate() bool { + // Rebuild the StatefulSet with HTTPS secret included + desiredSpec := hp.server.rebuildStatefulSet().Spec + hp.applyHttpsSecret(&desiredSpec.Template.Spec) + return hp.server.getStatefulSet().SpecChanged(desiredSpec) +} + +// applyHttpsSecret applies HTTPS secret volume and mount to the pod spec +func (hp *HttpProxy) applyHttpsSecret(podSpec *corev1.PodSpec) { + if hp.httpsSecret != nil { + hp.httpsSecret.AddVolume(podSpec) + hp.httpsSecret.AddVolumeMount(&podSpec.Containers[0]) + } +} + func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if ytv1.IsReadyToUpdateClusterState(hp.ytsaurus.GetClusterState()) && hp.server.needUpdate() { + if ytv1.IsReadyToUpdateClusterState(hp.ytsaurus.GetClusterState()) && hp.needUpdate() { return SimpleStatus(SyncStatusNeedLocalUpdate), err } @@ -149,10 +171,7 @@ func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, err if hp.NeedSync() { if !dry { statefulSet := hp.server.buildStatefulSet() - if hp.httpsSecret != nil { - hp.httpsSecret.AddVolume(&statefulSet.Spec.Template.Spec) - hp.httpsSecret.AddVolumeMount(&statefulSet.Spec.Template.Spec.Containers[0]) - } + hp.applyHttpsSecret(&statefulSet.Spec.Template.Spec) err = hp.server.Sync(ctx) } return WaitingStatus(SyncStatusPending, "components"), err diff --git a/pkg/components/rpcproxy.go b/pkg/components/rpcproxy.go index 01cc80919..3a5733443 100644 --- a/pkg/components/rpcproxy.go +++ b/pkg/components/rpcproxy.go @@ -115,10 +115,32 @@ func (rp *RpcProxy) Fetch(ctx context.Context) error { return resources.Fetch(ctx, fetchable...) } +// needUpdate overrides the server's needUpdate to account for TLS secret volumes +// that are added after StatefulSet build +func (rp *RpcProxy) needUpdate() bool { + return rp.server.needUpdateWithSpecCheck(rp.needStatefulSetSpecUpdate) +} + +// needStatefulSetSpecUpdate checks if the StatefulSet spec has changed +func (rp *RpcProxy) needStatefulSetSpecUpdate() bool { + // Rebuild the StatefulSet with TLS secret included + desiredSpec := rp.server.rebuildStatefulSet().Spec + rp.applyTlsSecret(&desiredSpec.Template.Spec) + return rp.server.getStatefulSet().SpecChanged(desiredSpec) +} + +// applyTlsSecret applies TLS secret volume and mount to the pod spec +func (rp *RpcProxy) applyTlsSecret(podSpec *corev1.PodSpec) { + if rp.tlsSecret != nil { + rp.tlsSecret.AddVolume(podSpec) + rp.tlsSecret.AddVolumeMount(&podSpec.Containers[0]) + } +} + func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if ytv1.IsReadyToUpdateClusterState(rp.ytsaurus.GetClusterState()) && rp.server.needUpdate() { + if ytv1.IsReadyToUpdateClusterState(rp.ytsaurus.GetClusterState()) && rp.needUpdate() { return SimpleStatus(SyncStatusNeedLocalUpdate), err } @@ -139,10 +161,7 @@ func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro if rp.NeedSync() { if !dry { statefulSet := rp.server.buildStatefulSet() - if secret := rp.tlsSecret; secret != nil { - secret.AddVolume(&statefulSet.Spec.Template.Spec) - secret.AddVolumeMount(&statefulSet.Spec.Template.Spec.Containers[0]) - } + rp.applyTlsSecret(&statefulSet.Spec.Template.Spec) err = rp.server.Sync(ctx) } return WaitingStatus(SyncStatusPending, "components"), err diff --git a/pkg/components/server.go b/pkg/components/server.go index 8db0ce828..48b39d07e 100644 --- a/pkg/components/server.go +++ b/pkg/components/server.go @@ -29,11 +29,13 @@ type server interface { resources.Syncable podsManager needUpdate() bool + needUpdateWithSpecCheck(specCheckFunc func() bool) bool configNeedsReload() bool needBuild() bool needSync() bool buildStatefulSet() *appsv1.StatefulSet rebuildStatefulSet() *appsv1.StatefulSet + getStatefulSet() *resources.StatefulSet addCABundleMount(c *corev1.Container) addTlsSecretMount(c *corev1.Container) addMonitoringPort(port corev1.ServicePort) @@ -276,7 +278,9 @@ func (s *serverImpl) podsImageCorrespondsToSpec() bool { return found == len(s.sidecarImages) } -func (s *serverImpl) needUpdate() bool { +// needUpdateWithSpecCheck provides the common update checking logic. +// It accepts a spec checking function to allow component-specific spec comparisons. +func (s *serverImpl) needUpdateWithSpecCheck(specCheckFunc func() bool) bool { if !s.exists() { return false } @@ -289,7 +293,21 @@ func (s *serverImpl) needUpdate() bool { if err != nil { return false } - return needReload + if needReload { + return true + } + + return specCheckFunc() +} + +func (s *serverImpl) needUpdate() bool { + return s.needUpdateWithSpecCheck(s.needStatefulSetSpecUpdate) +} + +// needStatefulSetSpecUpdate checks if the StatefulSet spec has changed +func (s *serverImpl) needStatefulSetSpecUpdate() bool { + desiredSpec := s.rebuildStatefulSet().Spec + return s.statefulSet.SpecChanged(desiredSpec) } func (s *serverImpl) arePodsReady(ctx context.Context) bool { @@ -430,6 +448,10 @@ func (s *serverImpl) rebuildStatefulSet() *appsv1.StatefulSet { return statefulSet } +func (s *serverImpl) getStatefulSet() *resources.StatefulSet { + return s.statefulSet +} + func (s *serverImpl) removePods(ctx context.Context) error { ss := s.rebuildStatefulSet() ss.Spec.Replicas = ptr.To(int32(0)) diff --git a/pkg/components/suite_test.go b/pkg/components/suite_test.go index a35c44234..559e4fbc3 100644 --- a/pkg/components/suite_test.go +++ b/pkg/components/suite_test.go @@ -18,6 +18,7 @@ import ( "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts" "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/labeller" mock_yt "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/mock" + "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/resources" "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/ypatch" ) @@ -137,6 +138,14 @@ func (fs *FakeServer) buildStatefulSet() *appsv1.StatefulSet { return nil } +func (fs *FakeServer) getStatefulSet() *resources.StatefulSet { + return nil +} + +func (fs *FakeServer) needUpdateWithSpecCheck(specCheckFunc func() bool) bool { + return false +} + func (fs *FakeServer) rebuildStatefulSet() *appsv1.StatefulSet { return nil } diff --git a/pkg/components/yql_agent.go b/pkg/components/yql_agent.go index 5ef95a666..921f0c18b 100644 --- a/pkg/components/yql_agent.go +++ b/pkg/components/yql_agent.go @@ -143,10 +143,37 @@ func (yqla *YqlAgent) createUpdateScript() string { return strings.Join(script, "\n") } +// applyYqlAgentContainerModifications applies YqlAgent-specific modifications to the container +func (yqla *YqlAgent) applyYqlAgentContainerModifications(container *corev1.Container) { + container.Command = []string{"sh", "-c", fmt.Sprintf("echo -n $YT_TOKEN > %s; %s", consts.DefaultYqlTokenPath, strings.Join(container.Command, " "))} + container.EnvFrom = []corev1.EnvFromSource{yqla.secret.GetEnvSource()} + if yqla.ytsaurus.GetResource().Spec.UseIPv6 && !yqla.ytsaurus.GetResource().Spec.UseIPv4 { + container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "1"}} + } else if !yqla.ytsaurus.GetResource().Spec.UseIPv6 && yqla.ytsaurus.GetResource().Spec.UseIPv4 { + container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "1"}, {Name: "YT_FORCE_IPV6", Value: "0"}} + } else { + container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "0"}} + } + container.Env = append(container.Env, getDefaultEnv()...) +} + +// needUpdate checks if YqlAgent needs an update, applying YqlAgent-specific modifications +func (yqla *YqlAgent) needUpdate() bool { + return yqla.server.needUpdateWithSpecCheck(yqla.needStatefulSetSpecUpdate) +} + +// needStatefulSetSpecUpdate this applies YqlAgent-specific modifications before comparison +func (yqla *YqlAgent) needStatefulSetSpecUpdate() bool { + ss := yqla.server.rebuildStatefulSet() + container := &ss.Spec.Template.Spec.Containers[0] + yqla.applyYqlAgentContainerModifications(container) + return yqla.server.getStatefulSet().SpecChanged(ss.Spec) +} + func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if ytv1.IsReadyToUpdateClusterState(yqla.ytsaurus.GetClusterState()) && yqla.server.needUpdate() { + if ytv1.IsReadyToUpdateClusterState(yqla.ytsaurus.GetClusterState()) && yqla.needUpdate() { return SimpleStatus(SyncStatusNeedLocalUpdate), err } @@ -194,17 +221,7 @@ func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er if !dry { ss := yqla.server.buildStatefulSet() container := &ss.Spec.Template.Spec.Containers[0] - container.Command = []string{"sh", "-c", fmt.Sprintf("echo -n $YT_TOKEN > %s; %s", consts.DefaultYqlTokenPath, strings.Join(container.Command, " "))} - container.EnvFrom = []corev1.EnvFromSource{yqla.secret.GetEnvSource()} - if yqla.ytsaurus.GetResource().Spec.UseIPv6 && !yqla.ytsaurus.GetResource().Spec.UseIPv4 { - container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "1"}} - } else if !yqla.ytsaurus.GetResource().Spec.UseIPv6 && yqla.ytsaurus.GetResource().Spec.UseIPv4 { - container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "1"}, {Name: "YT_FORCE_IPV6", Value: "0"}} - } else { - container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "0"}} - } - - container.Env = append(container.Env, getDefaultEnv()...) + yqla.applyYqlAgentContainerModifications(container) err = yqla.server.Sync(ctx) } return WaitingStatus(SyncStatusPending, "components"), err diff --git a/pkg/resources/compare_pod_template_spec.go b/pkg/resources/compare_pod_template_spec.go new file mode 100644 index 000000000..f2747de98 --- /dev/null +++ b/pkg/resources/compare_pod_template_spec.go @@ -0,0 +1,361 @@ +package resources + +import ( + "sort" + + corev1 "k8s.io/api/core/v1" +) + +func podTemplateSpecEqual(a, b corev1.PodTemplateSpec) bool { + return podSpecEqual(a.Spec, b.Spec) +} + +func podSpecEqual(a, b corev1.PodSpec) bool { + // Containers + if !containersEqual(a.Containers, b.Containers) { + return false + } + + // InitContainers + if !containersEqual(a.InitContainers, b.InitContainers) { + return false + } + + // Volumes + if !volumesEqual(a.Volumes, b.Volumes) { + return false + } + + // Node selector, tolerations, affinity + if !mapsEqual(a.NodeSelector, b.NodeSelector) { + return false + } + if !tolerationsEqual(a.Tolerations, b.Tolerations) { + return false + } + if !affinityEqual(a.Affinity, b.Affinity) { + return false + } + + // Service account + if a.ServiceAccountName != b.ServiceAccountName { + return false + } + + return true +} + +func containersEqual(a, b []corev1.Container) bool { + if len(a) != len(b) { + return false + } + + // Sort by container name for determinism + a, b = sortPair(a, b, func(c corev1.Container) string { return c.Name }) + + for i := range a { + ca := a[i] + cb := b[i] + if ca.Name != cb.Name { + return false + } + + if ca.Image != cb.Image { + return false + } + + if !stringSlicesEqual(ca.Command, cb.Command) { + return false + } + if !stringSlicesEqual(ca.Args, cb.Args) { + return false + } + + if !envsEqual(ca.Env, cb.Env) { + return false + } + + if !envFromEqual(ca.EnvFrom, cb.EnvFrom) { + return false + } + + if !ResourceRequirementsEqual(ca.Resources, cb.Resources) { + return false + } + + if !volumeMountsEqual(ca.VolumeMounts, cb.VolumeMounts) { + return false + } + } + + return true +} + +func envsEqual(a, b []corev1.EnvVar) bool { + if len(a) != len(b) { + return false + } + // Sort by env name + a, b = sortPair(a, b, func(e corev1.EnvVar) string { return e.Name }) + + for i := range a { + if a[i].Name != b[i].Name { + return false + } + if a[i].Value != b[i].Value { + return false + } + if (a[i].ValueFrom == nil) != (b[i].ValueFrom == nil) { + return false + } + } + + return true +} + +func envFromEqual(a, b []corev1.EnvFromSource) bool { + if len(a) != len(b) { + return false + } + + getKey := func(e corev1.EnvFromSource) string { + if e.ConfigMapRef != nil { + return "cm:" + e.ConfigMapRef.Name + } + if e.SecretRef != nil { + return "secret:" + e.SecretRef.Name + } + return "" + } + a, b = sortPair(a, b, getKey) + + for i := range a { + // Compare ConfigMapRef + if !nilnessEqual(a[i].ConfigMapRef, b[i].ConfigMapRef) { + return false + } + if a[i].ConfigMapRef != nil && a[i].ConfigMapRef.Name != b[i].ConfigMapRef.Name { + return false + } + + // Compare SecretRef + if !nilnessEqual(a[i].SecretRef, b[i].SecretRef) { + return false + } + if a[i].SecretRef != nil && a[i].SecretRef.Name != b[i].SecretRef.Name { + return false + } + } + + return true +} + +func tolerationsEqual(a, b []corev1.Toleration) bool { + if len(a) != len(b) { + return false + } + // Sort by key + a, b = sortPair(a, b, func(t corev1.Toleration) string { return t.Key }) + + for i := range a { + if a[i].Key != b[i].Key || + a[i].Operator != b[i].Operator || + a[i].Value != b[i].Value || + a[i].Effect != b[i].Effect { + return false + } + } + return true +} + +func affinityEqual(a, b *corev1.Affinity) bool { + if equal, shouldReturn := bothNilOrBothNotNil(a, b); shouldReturn { + return equal + } + + if !nodeAffinityEqual(a.NodeAffinity, b.NodeAffinity) { + return false + } + return true +} + +func nodeAffinityEqual(a, b *corev1.NodeAffinity) bool { + if equal, shouldReturn := bothNilOrBothNotNil(a, b); shouldReturn { + return equal + } + + if equal, shouldReturn := bothNilOrBothNotNil(a.RequiredDuringSchedulingIgnoredDuringExecution, b.RequiredDuringSchedulingIgnoredDuringExecution); shouldReturn { + return equal + } + + aTerms := a.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + bTerms := b.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + return len(aTerms) == len(bTerms) +} + +func volumeMountsEqual(a, b []corev1.VolumeMount) bool { + if len(a) != len(b) { + return false + } + a, b = sortPair(a, b, func(v corev1.VolumeMount) string { return v.Name }) + + for i := range a { + if a[i].Name != b[i].Name { + return false + } + if a[i].MountPath != b[i].MountPath { + return false + } + if a[i].SubPath != b[i].SubPath { + return false + } + if a[i].ReadOnly != b[i].ReadOnly { + return false + } + } + return true +} + +func volumesEqual(a, b []corev1.Volume) bool { + if len(a) != len(b) { + return false + } + a, b = sortPair(a, b, func(v corev1.Volume) string { return v.Name }) + + for i := range a { + if a[i].Name != b[i].Name { + return false + } + if !volumeSourceEqual(a[i].VolumeSource, b[i].VolumeSource) { + return false + } + } + return true +} + +// volumeSourceEqual compares key fields of common volume source types +func volumeSourceEqual(a, b corev1.VolumeSource) bool { + if !nilnessEqual(a.ConfigMap, b.ConfigMap) { + return false + } + if a.ConfigMap != nil && a.ConfigMap.Name != b.ConfigMap.Name { + return false + } + + if !nilnessEqual(a.Secret, b.Secret) { + return false + } + if a.Secret != nil && a.Secret.SecretName != b.Secret.SecretName { + return false + } + + if !nilnessEqual(a.PersistentVolumeClaim, b.PersistentVolumeClaim) { + return false + } + if a.PersistentVolumeClaim != nil && a.PersistentVolumeClaim.ClaimName != b.PersistentVolumeClaim.ClaimName { + return false + } + + if !nilnessEqual(a.EmptyDir, b.EmptyDir) { + return false + } + + if !nilnessEqual(a.HostPath, b.HostPath) { + return false + } + if a.HostPath != nil && a.HostPath.Path != b.HostPath.Path { + return false + } + + return true +} + +func mapsEqual(a, b map[string]string) bool { + if len(a) != len(b) { + return false + } + for k, v := range a { + if b[k] != v { + return false + } + } + return true +} + +func stringSlicesEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +// ResourceRequirementsEqual compares resource requirements, handling nil vs empty cases +func ResourceRequirementsEqual(oldResourceRequirements, newResourceRequirements corev1.ResourceRequirements) bool { + // Compare requests + if !ResourceListEqual(oldResourceRequirements.Requests, newResourceRequirements.Requests) { + return false + } + + // Compare limits + if !ResourceListEqual(oldResourceRequirements.Limits, newResourceRequirements.Limits) { + return false + } + + return true +} + +// ResourceListEqual compares resource lists +func ResourceListEqual(oldResourceList, newResourceList corev1.ResourceList) bool { + // If both are nil or empty, they're equal + if len(oldResourceList) == 0 && len(newResourceList) == 0 { + return true + } + + if len(oldResourceList) != len(newResourceList) { + return false + } + + // Compare each resource + for key, newVal := range newResourceList { + oldVal, exists := oldResourceList[key] + if !exists || !oldVal.Equal(newVal) { + return false + } + } + + return true +} + +func bothNilOrBothNotNil[T any](a, b *T) (equal bool, shouldReturn bool) { + if a == nil && b == nil { + return true, true + } + if (a == nil) != (b == nil) { + return false, true + } + return false, false +} + +// nilnessEqual checks if both pointers have the same nilness (both nil or both not nil) +func nilnessEqual[T any](a, b *T) bool { + return (a == nil) == (b == nil) +} + +// sortPair sorts two slices using the provided key extractor for comparison. +func sortPair[T any, K ~string](a, b []T, keyFunc func(T) K) (aSorted, bSorted []T) { + // Create copies to avoid mutating originals + aCopy := make([]T, len(a)) + bCopy := make([]T, len(b)) + copy(aCopy, a) + copy(bCopy, b) + + sort.Slice(aCopy, func(i, j int) bool { return keyFunc(aCopy[i]) < keyFunc(aCopy[j]) }) + sort.Slice(bCopy, func(i, j int) bool { return keyFunc(bCopy[i]) < keyFunc(bCopy[j]) }) + + return aCopy, bCopy +} diff --git a/pkg/resources/statefulset.go b/pkg/resources/statefulset.go index 94f73e7fa..8dae6389c 100644 --- a/pkg/resources/statefulset.go +++ b/pkg/resources/statefulset.go @@ -151,3 +151,8 @@ func (s *StatefulSet) NeedSync(replicas int32) bool { return s.oldObject.Spec.Replicas == nil || *s.oldObject.Spec.Replicas != replicas } + +// SpecChanged compares the old StatefulSet spec with a new spec to detect changes. +func (s *StatefulSet) SpecChanged(newSpec appsv1.StatefulSetSpec) bool { + return !podTemplateSpecEqual(s.oldObject.Spec.Template, newSpec.Template) +} diff --git a/test/e2e/helpers_test.go b/test/e2e/helpers_test.go index e1499880d..9b734166d 100644 --- a/test/e2e/helpers_test.go +++ b/test/e2e/helpers_test.go @@ -14,6 +14,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -47,9 +48,16 @@ func getKindControlPlaneNode() corev1.Node { nodeList := corev1.NodeList{} err := k8sClient.List(ctx, &nodeList) Expect(err).Should(Succeed()) - Expect(nodeList.Items).To(HaveLen(1)) - Expect(nodeList.Items[0].Name).To(HaveSuffix("kind-control-plane")) - return nodeList.Items[0] + + // Find the control plane node in multi-node Kind clusters + for _, node := range nodeList.Items { + if strings.HasSuffix(node.Name, "kind-control-plane") { + return node + } + } + + Fail("No control plane node found") + return corev1.Node{} } func getNodesAddresses() []string { @@ -629,3 +637,76 @@ func restartPod(ctx context.Context, namespace, name string) { HaveField("Status.Phase", Equal(corev1.PodRunning)), )) } + +// getStatefulSet retrieves a StatefulSet and waits for it to have containers +func getStatefulSet(ctx context.Context, name, namespace string, timeout time.Duration) appsv1.StatefulSet { + var sts appsv1.StatefulSet + Eventually(func(g Gomega) { + err := k8sClient.Get(ctx, ctrlcli.ObjectKey{ + Name: name, + Namespace: namespace, + }, &sts) + g.Expect(err).Should(Succeed()) + g.Expect(sts.Spec.Template.Spec.Containers).ShouldNot(BeEmpty()) + }, timeout, pollInterval).Should(Succeed()) + return sts +} + +// componentResourceVersions stores ResourceVersions of multiple components +type componentResourceVersions map[string]string + +// recordComponentVersions records ResourceVersions for multiple StatefulSets to verify they don't change +func recordComponentVersions(ctx context.Context, namespace string, componentNames ...string) componentResourceVersions { + versions := make(componentResourceVersions) + for _, name := range componentNames { + var sts appsv1.StatefulSet + Expect(k8sClient.Get(ctx, ctrlcli.ObjectKey{ + Name: name, + Namespace: namespace, + }, &sts)).Should(Succeed()) + versions[name] = sts.ResourceVersion + } + log.Info("Recorded StatefulSet versions", "versions", versions) + return versions +} + +// verifyComponentVersionsUnchanged verifies that StatefulSet ResourceVersions haven't changed +func verifyComponentVersionsUnchanged(ctx context.Context, namespace string, versionsBefore componentResourceVersions) { + for name, versionBefore := range versionsBefore { + var sts appsv1.StatefulSet + Expect(k8sClient.Get(ctx, ctrlcli.ObjectKey{ + Name: name, + Namespace: namespace, + }, &sts)).Should(Succeed()) + versionAfter := sts.ResourceVersion + log.Info("Comparing StatefulSet version", + "component", name, + "before", versionBefore, + "after", versionAfter) + Expect(versionAfter).Should(Equal(versionBefore), + "StatefulSet %s should NOT be updated when not in UpdateSelector", name) + } +} + +// setUpdatePlanAndUpdateYT sets UpdatePlan with given selector and updates Ytsaurus object +func setUpdatePlanAndUpdateYT(ctx context.Context, ytsaurus *ytv1.Ytsaurus, selector ytv1.ComponentUpdateSelector, description string) { + By(description) + ytsaurus.Spec.UpdatePlan = []ytv1.ComponentUpdateSelector{selector} + UpdateObject(ctx, ytsaurus) +} + +// setUpdatePlanForComponentAndUpdateYT sets UpdatePlan to allow only specific component updates and updates Ytsaurus object +func setUpdatePlanForComponentAndUpdateYT(ctx context.Context, ytsaurus *ytv1.Ytsaurus, componentType consts.ComponentType) { + setUpdatePlanAndUpdateYT(ctx, ytsaurus, ytv1.ComponentUpdateSelector{ + Component: ytv1.Component{ + Type: componentType, + }, + }, fmt.Sprintf("Setting UpdatePlan to allow only %s updates", componentType)) +} + +// blockAllComponentUpdates sets UpdatePlan to block all component updates +func blockAllComponentUpdates(ctx context.Context, ytsaurus *ytv1.Ytsaurus) { + setUpdatePlanAndUpdateYT(ctx, ytsaurus, ytv1.ComponentUpdateSelector{ + Class: consts.ComponentClassNothing, + }, "Setting UpdatePlan to block all component updates") +} diff --git a/test/e2e/ytsaurus_controller_test.go b/test/e2e/ytsaurus_controller_test.go index 8e88d6129..d659c1d54 100644 --- a/test/e2e/ytsaurus_controller_test.go +++ b/test/e2e/ytsaurus_controller_test.go @@ -32,7 +32,9 @@ import ( "go.ytsaurus.tech/yt/go/yt/ytrpc" "go.ytsaurus.tech/yt/go/yterrors" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" certv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" @@ -728,9 +730,7 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func() "pods shouldn't be recreated when update is blocked", ) - By("Update cluster update with selector: class=Everything") - ytsaurus.Spec.UpdatePlan = []ytv1.ComponentUpdateSelector{{Class: consts.ComponentClassEverything}} - UpdateObject(ctx, ytsaurus) + setUpdatePlanAndUpdateYT(ctx, ytsaurus, ytv1.ComponentUpdateSelector{Class: consts.ComponentClassEverything}, "Update cluster update with selector: class=Everything") EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveObservedGeneration()) Expect(ytsaurus).Should(HaveClusterUpdatingComponents( @@ -792,13 +792,7 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func() Expect(pods.Deleted).To(BeEmpty(), "deleted") Expect(pods.Updated).To(ConsistOf("end-0"), "updated") - By("Run cluster update with selector:TabletNodesOnly") - ytsaurus.Spec.UpdatePlan = []ytv1.ComponentUpdateSelector{{ - Component: ytv1.Component{ - Type: consts.TabletNodeType, - }, - }} - UpdateObject(ctx, ytsaurus) + setUpdatePlanForComponentAndUpdateYT(ctx, ytsaurus, consts.TabletNodeType) EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveObservedGeneration()) Expect(ytsaurus).Should(HaveClusterUpdatingComponents(consts.TabletNodeType)) @@ -852,11 +846,7 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func() Expect(pods.Deleted).To(BeEmpty(), "deleted") Expect(pods.Updated).To(ConsistOf("ms-0"), "updated") - By("Run cluster update with selector:StatelessOnly") - ytsaurus.Spec.UpdatePlan = []ytv1.ComponentUpdateSelector{{ - Class: consts.ComponentClassStateless, - }} - UpdateObject(ctx, ytsaurus) + setUpdatePlanAndUpdateYT(ctx, ytsaurus, ytv1.ComponentUpdateSelector{Class: consts.ComponentClassStateless}, "Run cluster update with selector:StatelessOnly") EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveObservedGeneration()) Expect(ytsaurus).Should(HaveClusterUpdatingComponents( @@ -1789,6 +1779,368 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func() }) // integration }) +// Standalone test suite for statefulset spec updates +var _ = Describe("Statefulset spec update test", Label("statefulsetUpdate"), func() { + var namespace string + var ytsaurus *ytv1.Ytsaurus + var ytBuilder *testutil.YtsaurusBuilder + + BeforeEach(func() { + By("Creating namespace") + currentSpec := CurrentSpecReport() + namespaceObject := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-resources-", + Labels: map[string]string{ + "app.kubernetes.io/component": "test", + "app.kubernetes.io/name": "test-" + strings.Join(currentSpec.Labels(), "-"), + "app.kubernetes.io/part-of": "ytsaurus-dev", + }, + Annotations: map[string]string{ + "kubernetes.io/description": currentSpec.LeafNodeText, + }, + }, + } + Expect(k8sClient.Create(ctx, &namespaceObject)).Should(Succeed()) + namespace = namespaceObject.Name + log.Info("Namespace created", "namespace", namespace) + + By("Creating minimal Ytsaurus spec") + ytBuilder = &testutil.YtsaurusBuilder{ + Images: testutil.CurrentImages, + Namespace: namespace, + } + ytBuilder.CreateMinimal() + ytsaurus = ytBuilder.Ytsaurus + ytsaurus.Spec.CoreImage = testutil.YtsaurusImage24_2 + }) + + JustBeforeEach(func(ctx context.Context) { + By("Creating Ytsaurus object") + ytsaurus.SetNamespace(namespace) + Expect(k8sClient.Create(ctx, ytsaurus)).Should(Succeed()) + + By("Waiting for cluster to reach Running state") + EventuallyYtsaurus(ctx, ytsaurus, bootstrapTimeout).Should(HaveClusterStateRunning()) + }) + + AfterEach(func(ctx context.Context) { + if ShouldPreserveArtifacts() { + log.Info("Preserving artifacts", "namespace", namespace) + return + } + + By("Deleting Ytsaurus object") + if err := k8sClient.Delete(ctx, ytsaurus); err != nil { + log.Error(err, "Cannot delete ytsaurus") + } + + By("Deleting namespace") + namespaceObject := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + } + Expect(k8sClient.Delete(ctx, &namespaceObject)).Should(Succeed()) + }) + + It("Should update Discovery resource limits with UpdateSelector", func(ctx context.Context) { + By("Waiting for discovery StatefulSet to be created and ready") + statefulSetBefore := getStatefulSet(ctx, "ds", namespace, bootstrapTimeout) + + By("Recording original discovery server resource limits") + originalCPU := statefulSetBefore.Spec.Template.Spec.Containers[0].Resources.Limits.Cpu().String() + originalMemory := statefulSetBefore.Spec.Template.Spec.Containers[0].Resources.Limits.Memory().String() + log.Info("Original discovery server resources", + "cpu", originalCPU, + "memory", originalMemory) + + // Record Master and HttpProxy StatefulSet ResourceVersions to verify they don't restart + By("Recording Master and HttpProxy StatefulSet ResourceVersions before update") + otherComponentVersions := recordComponentVersions(ctx, namespace, "ms", "hp") + + setUpdatePlanForComponentAndUpdateYT(ctx, ytsaurus, consts.DiscoveryType) + + By("Updating discovery server resource limits") + newCPULimit := "500m" + newMemoryLimit := "1Gi" + + ytsaurus.Spec.Discovery.InstanceSpec.Resources.Limits = corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(newCPULimit), + corev1.ResourceMemory: resource.MustParse(newMemoryLimit), + } + UpdateObject(ctx, ytsaurus) + + log.Info("Updated Ytsaurus spec with new resources", "newCPU", newCPULimit, "newMemory", newMemoryLimit) + + By("Waiting for operator to observe the spec change") + EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveObservedGeneration()) + Expect(ytsaurus).Should(HaveClusterUpdatingComponents(consts.DiscoveryType)) + + By("Verifying StatefulSet spec was updated with new resource limits") + Eventually(func(g Gomega) { + var statefulSetAfter appsv1.StatefulSet + err := k8sClient.Get(ctx, client.ObjectKey{ + Name: "ds", + Namespace: namespace, + }, &statefulSetAfter) + g.Expect(err).Should(Succeed()) + g.Expect(statefulSetAfter.Spec.Template.Spec.Containers).ShouldNot(BeEmpty()) + + updatedCPU := statefulSetAfter.Spec.Template.Spec.Containers[0].Resources.Limits.Cpu().String() + updatedMemory := statefulSetAfter.Spec.Template.Spec.Containers[0].Resources.Limits.Memory().String() + + log.Info("Checking updated StatefulSet resources", + "cpu", updatedCPU, + "memory", updatedMemory, + "expectedCPU", newCPULimit, + "expectedMemory", newMemoryLimit) + + g.Expect(updatedCPU).Should(Equal(newCPULimit), "CPU limit should be updated for Discovery") + g.Expect(updatedMemory).Should(Equal(newMemoryLimit), "Memory limit should be updated for Discovery") + }, upgradeTimeout, pollInterval).Should(Succeed()) + + By("Waiting for cluster to complete update and return to Running state") + EventuallyYtsaurus(ctx, ytsaurus, upgradeTimeout).Should(HaveClusterStateRunning()) + + By("Verifying Master and HttpProxy StatefulSets were NOT restarted") + verifyComponentVersionsUnchanged(ctx, namespace, otherComponentVersions) + + By("Test completed successfully - Discovery resource limits were updated via UpdateSelector") + }) + + It("Should update Discovery node selectors with UpdateSelector", func(ctx context.Context) { + By("Waiting for discovery StatefulSet to be created and ready") + statefulSetBefore := getStatefulSet(ctx, "ds", namespace, bootstrapTimeout) + + By("Recording original discovery server node selectors") + originalNodeSelectorCount := len(statefulSetBefore.Spec.Template.Spec.NodeSelector) + log.Info("Original discovery server node selectors", + "count", originalNodeSelectorCount, + "selectors", statefulSetBefore.Spec.Template.Spec.NodeSelector) + + // Record Master and HttpProxy StatefulSet ResourceVersions to verify they don't restart + By("Recording Master and HttpProxy StatefulSet ResourceVersions before update") + otherComponentVersions := recordComponentVersions(ctx, namespace, "ms", "hp") + + setUpdatePlanForComponentAndUpdateYT(ctx, ytsaurus, consts.DiscoveryType) + + By("Adding node selector to discovery server") + nodeSelectorKey := "test-node-label-doesnt-exist" + nodeSelectorValue := "test-value-doesnt-exist" + + // Add node selector to Discovery component + if ytsaurus.Spec.Discovery.InstanceSpec.NodeSelector == nil { + ytsaurus.Spec.Discovery.InstanceSpec.NodeSelector = make(map[string]string) + } + ytsaurus.Spec.Discovery.InstanceSpec.NodeSelector[nodeSelectorKey] = nodeSelectorValue + UpdateObject(ctx, ytsaurus) + + log.Info("Updated Ytsaurus spec with new node selector", + "key", nodeSelectorKey, + "value", nodeSelectorValue) + + By("Waiting for operator to observe the spec change") + EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveObservedGeneration()) + Expect(ytsaurus).Should(HaveClusterUpdatingComponents(consts.DiscoveryType)) + + By("Verifying StatefulSet spec was updated with new node selector") + Eventually(func(g Gomega) { + var statefulSetAfter appsv1.StatefulSet + err := k8sClient.Get(ctx, client.ObjectKey{ + Name: "ds", + Namespace: namespace, + }, &statefulSetAfter) + g.Expect(err).Should(Succeed()) + g.Expect(statefulSetAfter.Spec.Template.Spec.Containers).ShouldNot(BeEmpty()) + + nodeSelector := statefulSetAfter.Spec.Template.Spec.NodeSelector + log.Info("Checking updated StatefulSet node selector", + "nodeSelector", nodeSelector, + "expectedKey", nodeSelectorKey, + "expectedValue", nodeSelectorValue) + + g.Expect(nodeSelector).Should(HaveKeyWithValue(nodeSelectorKey, nodeSelectorValue), + "Node selector should be updated for Discovery") + }, upgradeTimeout, pollInterval).Should(Succeed()) + + By("Verifying Master and HttpProxy StatefulSets were NOT restarted") + verifyComponentVersionsUnchanged(ctx, namespace, otherComponentVersions) + + By("Test completed successfully - Discovery node selectors were updated via UpdateSelector") + }) + + It("Should update Discovery environment variables with UpdateSelector", func(ctx context.Context) { + By("Waiting for discovery StatefulSet to be created and ready") + statefulSetBefore := getStatefulSet(ctx, "ds", namespace, bootstrapTimeout) + + By("Recording original discovery server environment variables") + originalEnvCount := len(statefulSetBefore.Spec.Template.Spec.Containers[0].Env) + log.Info("Original discovery server environment", + "envCount", originalEnvCount) + + // Record Master and HttpProxy StatefulSet ResourceVersions to verify they don't restart + By("Recording Master and HttpProxy StatefulSet ResourceVersions before update") + otherComponentVersions := recordComponentVersions(ctx, namespace, "ms", "hp") + + setUpdatePlanForComponentAndUpdateYT(ctx, ytsaurus, consts.DiscoveryType) + + // Manually add environment variables to Discovery StatefulSet spec + var statefulSet appsv1.StatefulSet + Expect(k8sClient.Get(ctx, client.ObjectKey{ + Name: "ds", + Namespace: namespace, + }, &statefulSet)).Should(Succeed()) + + env1Val := "foobar" + env2Val := "bazbar" + statefulSet.Spec.Template.Spec.Containers[0].Env = append( + statefulSet.Spec.Template.Spec.Containers[0].Env, + corev1.EnvVar{ + Name: "YTSAURUS_DISCOVERY_UPDATE_TEST_ENV_1", + Value: env1Val, + }, + corev1.EnvVar{ + Name: "YTSAURUS_DISCOVERY_UPDATE_TEST_ENV_2", + Value: env2Val, + }, + ) + Expect(k8sClient.Update(ctx, &statefulSet)).Should(Succeed()) + + By("Verifying StatefulSet spec was updated with new environment variables") + Eventually(func(g Gomega) { + var statefulSetAfter appsv1.StatefulSet + err := k8sClient.Get(ctx, client.ObjectKey{ + Name: "ds", + Namespace: namespace, + }, &statefulSetAfter) + g.Expect(err).Should(Succeed()) + g.Expect(statefulSetAfter.Spec.Template.Spec.Containers).ShouldNot(BeEmpty()) + + container := statefulSetAfter.Spec.Template.Spec.Containers[0] + log.Info("Checking updated StatefulSet environment", + "envCount", len(container.Env)) + + // Find the new environment variables + foundEnv1 := false + foundEnv2 := false + for _, env := range container.Env { + if env.Name == "YTSAURUS_DISCOVERY_UPDATE_TEST_ENV_1" { + g.Expect(env.Value).Should(Equal(env1Val), "env1 var should have correct value") + foundEnv1 = true + } + if env.Name == "YTSAURUS_DISCOVERY_UPDATE_TEST_ENV_2" { + g.Expect(env.Value).Should(Equal(env2Val), "env2 var should have correct value") + foundEnv2 = true + } + } + g.Expect(foundEnv1).Should(BeTrue(), "env1 var should be present in container") + g.Expect(foundEnv2).Should(BeTrue(), "env2 var should be present in container") + }, upgradeTimeout, pollInterval).Should(Succeed()) + + By("Waiting for cluster to complete update and return to Running state") + EventuallyYtsaurus(ctx, ytsaurus, upgradeTimeout).Should(HaveClusterStateRunning()) + + By("Verifying Master and HttpProxy StatefulSets were NOT restarted") + verifyComponentVersionsUnchanged(ctx, namespace, otherComponentVersions) + + By("Test completed successfully - Discovery environment variables were updated via UpdateSelector") + }) + + It("Should NOT update Discovery when UpdatePlan blocks updates", Label("resources", "blocked"), func(ctx context.Context) { + By("Waiting for discovery StatefulSet to be created and ready") + var statefulSetBefore appsv1.StatefulSet + Eventually(func(g Gomega) { + err := k8sClient.Get(ctx, client.ObjectKey{ + Name: "ds", + Namespace: namespace, + }, &statefulSetBefore) + g.Expect(err).Should(Succeed()) + g.Expect(statefulSetBefore.Spec.Template.Spec.Containers).ShouldNot(BeEmpty()) + }, bootstrapTimeout, pollInterval).Should(Succeed()) + + By("Recording original discovery pod UID") + var discoveryPodBefore corev1.Pod + Eventually(func(g Gomega) { + err := k8sClient.Get(ctx, client.ObjectKey{ + Name: "ds-0", + Namespace: namespace, + }, &discoveryPodBefore) + g.Expect(err).Should(Succeed()) + }, bootstrapTimeout, pollInterval).Should(Succeed()) + originalPodUID := discoveryPodBefore.UID + log.Info("Original discovery pod", "uid", originalPodUID) + + blockAllComponentUpdates(ctx, ytsaurus) + + By("Updating discovery server resource limits with UpdatePlan blocking all updates") + newCPULimit := "600m" + newMemoryLimit := "2Gi" + + ytsaurus.Spec.Discovery.InstanceSpec.Resources.Limits = corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(newCPULimit), + corev1.ResourceMemory: resource.MustParse(newMemoryLimit), + } + UpdateObject(ctx, ytsaurus) + + log.Info("Updated Ytsaurus spec with new resources but UpdatePlan blocks all updates", "newCPU", newCPULimit, "newMemory", newMemoryLimit) + + By("Waiting for operator to observe the spec change") + EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveObservedGeneration()) + + By("Verifying cluster is in Running state but with blocked updates") + EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveClusterStateRunning()) + + // Check that Discovery is in the blocked components list + Eventually(func(g Gomega) { + err := k8sClient.Get(ctx, client.ObjectKeyFromObject(ytsaurus), ytsaurus) + g.Expect(err).Should(Succeed()) + g.Expect(ytsaurus.Status.UpdateStatus.BlockedComponentsSummary).ShouldNot(BeEmpty(), + "Discovery should be blocked when UpdatePlan=ComponentClassNothing") + }, reactionTimeout, pollInterval).Should(Succeed()) + + By("Verifying StatefulSet spec was NOT updated") + Consistently(func(g Gomega) { + var statefulSetAfter appsv1.StatefulSet + err := k8sClient.Get(ctx, client.ObjectKey{ + Name: "ds", + Namespace: namespace, + }, &statefulSetAfter) + g.Expect(err).Should(Succeed()) + g.Expect(statefulSetAfter.Spec.Template.Spec.Containers).ShouldNot(BeEmpty()) + + // StatefulSet should still have original resource limits + currentCPU := statefulSetAfter.Spec.Template.Spec.Containers[0].Resources.Limits.Cpu().String() + currentMemory := statefulSetAfter.Spec.Template.Spec.Containers[0].Resources.Limits.Memory().String() + + log.Info("Verifying StatefulSet resources unchanged", + "cpu", currentCPU, + "memory", currentMemory, + "shouldNotBeCPU", newCPULimit, + "shouldNotBeMemory", newMemoryLimit) + + // Should NOT have the new values + g.Expect(currentCPU).ShouldNot(Equal(newCPULimit), "CPU limit should NOT be updated without UpdatePlan") + g.Expect(currentMemory).ShouldNot(Equal(newMemoryLimit), "Memory limit should NOT be updated without UpdatePlan") + }, 10*time.Second, pollInterval).Should(Succeed()) + + By("Verifying discovery pod was NOT recreated") + var discoveryPodAfter corev1.Pod + Eventually(func(g Gomega) { + err := k8sClient.Get(ctx, client.ObjectKey{ + Name: "ds-0", + Namespace: namespace, + }, &discoveryPodAfter) + g.Expect(err).Should(Succeed()) + }, bootstrapTimeout, pollInterval).Should(Succeed()) + + Expect(discoveryPodAfter.UID).Should(Equal(originalPodUID), + "Discovery pod should NOT be recreated when UpdatePlan blocks updates") + + By("Test completed successfully - Discovery was NOT updated when blocked by UpdatePlan") + }) +}) + func checkClusterHealth(ytClient yt.Client) { By("Checking that cluster is alive") var res []string