Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
f076d4e
Made operator update sts not only when replicas changed
qurname2 Oct 14, 2025
cc9e4bb
Fixed linter problem, added condition check with nil and updated need…
qurname2 Oct 14, 2025
4490193
Removed unnecessary logic from needUpdate and replcaed comparison log…
qurname2 Oct 14, 2025
081036c
Removed conditoin check with nil
qurname2 Oct 14, 2025
b483d56
Added custom Sync method for sts and test to check if sts was updated…
qurname2 Oct 15, 2025
99ceb81
strange github error about checkClusterBaseViability
qurname2 Oct 15, 2025
327b70b
Fixed checkClusterBaseViability problem after rebasing
qurname2 Oct 15, 2025
d424740
Probably found what should be changed in needUpdate
qurname2 Oct 16, 2025
7f9bb37
Added canComponentBeUpdated to the each component's doSync cause need…
qurname2 Oct 16, 2025
c7cb520
Linter fixed
qurname2 Oct 17, 2025
2a2ddb9
Rollbacked logic with getEffectiveUpdateSelectors and canUpdateCompon…
qurname2 Oct 17, 2025
2bdbed2
Added UpdateStateNone in the update_flow_steps and adjusted needUpdate
qurname2 Oct 17, 2025
46ee56b
Created workaround explicitly for the execNodes: needUpdate should be…
qurname2 Oct 17, 2025
68dc3ae
Created strict field-by-field sts comparison and added more tests
qurname2 Oct 19, 2025
0b281e0
Created strict field-by-field sts comparison and added more tests
qurname2 Oct 19, 2025
b82e079
found yqla specific logic with YT_FORCE_IPV*. It added vars after bui…
qurname2 Oct 19, 2025
0dbfba9
Fixed some comments
qurname2 Oct 19, 2025
c5a6e7a
Added needUpdate override for http,rpc-proxy due to tls volumes being…
qurname2 Oct 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
apiProxy "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/apiproxy"
)

// canUpdateComponent checks if a component matches any of the given selectors.
func canUpdateComponent(selectors []ytv1.ComponentUpdateSelector, component ytv1.Component) bool {
for _, selector := range selectors {
if selector.Class != consts.ComponentClassUnspecified {
Expand All @@ -40,7 +41,7 @@ func canUpdateComponent(selectors []ytv1.ComponentUpdateSelector, component ytv1

// Considers splits all the components in two groups: ones that can be updated and ones which update isblocked.
func chooseUpdatingComponents(spec ytv1.YtsaurusSpec, needUpdate []ytv1.Component, allComponents []ytv1.Component) (canUpdate []ytv1.Component, cannotUpdate []ytv1.Component) {
configuredSelectors := getEffectiveSelectors(spec)
configuredSelectors := getEffectiveUpdateSelectors(spec)

for _, component := range needUpdate {
upd := canUpdateComponent(configuredSelectors, component)
Expand Down Expand Up @@ -82,7 +83,8 @@ func needFullUpdate(needUpdate []ytv1.Component) bool {
return false
}

func getEffectiveSelectors(spec ytv1.YtsaurusSpec) []ytv1.ComponentUpdateSelector {
// getEffectiveUpdateSelectors returns the effective update selectors based on the spec configuration.
func getEffectiveUpdateSelectors(spec ytv1.YtsaurusSpec) []ytv1.ComponentUpdateSelector {
if spec.UpdatePlan != nil {
return spec.UpdatePlan
}
Expand Down
6 changes: 6 additions & 0 deletions controllers/update_flow_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func newFlowTree(head *flowStep) *flowTree {
func (f *flowTree) execute(ctx context.Context, ytsaurus *apiProxy.Ytsaurus, componentManager *ComponentManager) (bool, error) {
var err error
currentState := ytsaurus.GetUpdateState()
// If UpdateState is empty (initial state), default to UpdateStateNone.
// This happens when an update is triggered for the first time before SaveUpdateState() is called.
// Otherwise, we would get a nil pointer dereference when looking up f.index[""].
if currentState == "" {
currentState = ytv1.UpdateStateNone
}
currentStep := f.index[currentState]

// will execute one step at a time
Expand Down
3 changes: 2 additions & 1 deletion pkg/components/exec_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func NewExecNode(
func (n *ExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) {
var err error

if ytv1.IsReadyToUpdateClusterState(n.ytsaurus.GetClusterState()) && (n.server.needUpdate() || n.sidecarConfigNeedsReload()) {
// Call needUpdate on self instead of n.server.needUpdate
if ytv1.IsReadyToUpdateClusterState(n.ytsaurus.GetClusterState()) && (n.needUpdate() || n.sidecarConfigNeedsReload()) {
return SimpleStatus(SyncStatusNeedLocalUpdate), err
}

Expand Down
80 changes: 62 additions & 18 deletions pkg/components/exec_node_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,70 @@ type baseExecNode struct {
sidecarConfig *ConfigMapBuilder
}

// addResourceList adds resources from newList to list, summing quantities for existing keys
func addResourceList(list, newList corev1.ResourceList) {
if newList == nil {
return
}
for name, quantity := range newList {
if value, ok := list[name]; ok {
value.Add(quantity)
list[name] = value
} else {
list[name] = quantity.DeepCopy()
}
}
}

// addJobResourcesToResources adds JobResources to the given ResourceRequirements if conditions are met
func (n *baseExecNode) addJobResourcesToResources(resources *corev1.ResourceRequirements) {
if n.spec.JobResources == nil || n.criConfig.Isolated {
return
}

// Ensure resource lists are initialized
if resources.Requests == nil {
resources.Requests = corev1.ResourceList{}
}
if resources.Limits == nil {
resources.Limits = corev1.ResourceList{}
}

addResourceList(resources.Requests, n.spec.JobResources.Requests)
addResourceList(resources.Limits, n.spec.JobResources.Limits)
}

func (n *baseExecNode) Fetch(ctx context.Context) error {
return resources.Fetch(ctx, n.server, n.sidecarConfig)
}

// needUpdate overrides the server's needUpdate to use ExecNode-specific resource comparison
func (n *baseExecNode) needUpdate() bool {
return n.server.needUpdateWithSpecCheck(n.needStatefulSetSpecUpdate)
}

// needStatefulSetSpecUpdate checks if resources have changed for ExecNode explicitly
// ExecNode adds JobResources to NodeResources, so we need to compute the expected total
func (n *baseExecNode) needStatefulSetSpecUpdate() bool {
oldSts := n.server.getStatefulSet().OldObject()

if oldSts.GetResourceVersion() == "" {
return false
}

if len(oldSts.Spec.Template.Spec.Containers) == 0 {
return false
}
oldContainer := oldSts.Spec.Template.Spec.Containers[0]

// Compute expected resources (NodeResources + JobResources if not isolated)
expectedResources := n.spec.InstanceSpec.Resources.DeepCopy()
n.addJobResourcesToResources(expectedResources)

// Compare expected with actual
return !resources.ResourceRequirementsEqual(oldContainer.Resources, *expectedResources)
}

func (n *baseExecNode) doBuildBase() error {
statefulSet := n.server.buildStatefulSet()
podSpec := &statefulSet.Spec.Template.Spec
Expand Down Expand Up @@ -61,24 +121,8 @@ func (n *baseExecNode) doBuildBase() error {
setContainerPrivileged(&podSpec.Containers[i])
}

if n.spec.JobResources != nil && !n.criConfig.Isolated {
// Pour job resources into node container if jobs are not isolated.
container := &podSpec.Containers[0]

addResourceList := func(list, newList corev1.ResourceList) {
for name, quantity := range newList {
if value, ok := list[name]; ok {
value.Add(quantity)
list[name] = value
} else {
list[name] = quantity.DeepCopy()
}
}
}

addResourceList(container.Resources.Requests, n.spec.JobResources.Requests)
addResourceList(container.Resources.Limits, n.spec.JobResources.Limits)
}
// Pour job resources into node container if jobs are not isolated.
n.addJobResourcesToResources(&podSpec.Containers[0].Resources)

if n.criConfig.Service != ytv1.CRIServiceNone {
if n.criConfig.Isolated {
Expand Down
3 changes: 2 additions & 1 deletion pkg/components/exec_node_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func NewRemoteExecNodes(
func (n *RemoteExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) {
var err error

if n.server.needSync() || n.server.needUpdate() || n.sidecarConfigNeedsReload() {
// Call needUpdate on self to get ExecNode-specific resource comparison
if n.server.needSync() || n.needUpdate() || n.sidecarConfigNeedsReload() {
return n.doSyncBase(ctx, dry)
}

Expand Down
29 changes: 24 additions & 5 deletions pkg/components/httpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,32 @@ func (hp *HttpProxy) Fetch(ctx context.Context) error {
)
}

// needUpdate overrides the server's needUpdate to account for HTTPS secret volumes
// that are added after StatefulSet build
func (hp *HttpProxy) needUpdate() bool {
return hp.server.needUpdateWithSpecCheck(hp.needStatefulSetSpecUpdate)
}

// needStatefulSetSpecUpdate checks if the StatefulSet spec has changed
func (hp *HttpProxy) needStatefulSetSpecUpdate() bool {
// Rebuild the StatefulSet with HTTPS secret included
desiredSpec := hp.server.rebuildStatefulSet().Spec
hp.applyHttpsSecret(&desiredSpec.Template.Spec)
return hp.server.getStatefulSet().SpecChanged(desiredSpec)
}

// applyHttpsSecret applies HTTPS secret volume and mount to the pod spec
func (hp *HttpProxy) applyHttpsSecret(podSpec *corev1.PodSpec) {
if hp.httpsSecret != nil {
hp.httpsSecret.AddVolume(podSpec)
hp.httpsSecret.AddVolumeMount(&podSpec.Containers[0])
}
}

func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, error) {
var err error

if ytv1.IsReadyToUpdateClusterState(hp.ytsaurus.GetClusterState()) && hp.server.needUpdate() {
if ytv1.IsReadyToUpdateClusterState(hp.ytsaurus.GetClusterState()) && hp.needUpdate() {
return SimpleStatus(SyncStatusNeedLocalUpdate), err
}

Expand All @@ -149,10 +171,7 @@ func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, err
if hp.NeedSync() {
if !dry {
statefulSet := hp.server.buildStatefulSet()
if hp.httpsSecret != nil {
hp.httpsSecret.AddVolume(&statefulSet.Spec.Template.Spec)
hp.httpsSecret.AddVolumeMount(&statefulSet.Spec.Template.Spec.Containers[0])
}
hp.applyHttpsSecret(&statefulSet.Spec.Template.Spec)
err = hp.server.Sync(ctx)
}
return WaitingStatus(SyncStatusPending, "components"), err
Expand Down
29 changes: 24 additions & 5 deletions pkg/components/rpcproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,32 @@ func (rp *RpcProxy) Fetch(ctx context.Context) error {
return resources.Fetch(ctx, fetchable...)
}

// needUpdate overrides the server's needUpdate to account for TLS secret volumes
// that are added after StatefulSet build
func (rp *RpcProxy) needUpdate() bool {
return rp.server.needUpdateWithSpecCheck(rp.needStatefulSetSpecUpdate)
}

// needStatefulSetSpecUpdate checks if the StatefulSet spec has changed
func (rp *RpcProxy) needStatefulSetSpecUpdate() bool {
// Rebuild the StatefulSet with TLS secret included
desiredSpec := rp.server.rebuildStatefulSet().Spec
rp.applyTlsSecret(&desiredSpec.Template.Spec)
return rp.server.getStatefulSet().SpecChanged(desiredSpec)
}

// applyTlsSecret applies TLS secret volume and mount to the pod spec
func (rp *RpcProxy) applyTlsSecret(podSpec *corev1.PodSpec) {
if rp.tlsSecret != nil {
rp.tlsSecret.AddVolume(podSpec)
rp.tlsSecret.AddVolumeMount(&podSpec.Containers[0])
}
}

func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, error) {
var err error

if ytv1.IsReadyToUpdateClusterState(rp.ytsaurus.GetClusterState()) && rp.server.needUpdate() {
if ytv1.IsReadyToUpdateClusterState(rp.ytsaurus.GetClusterState()) && rp.needUpdate() {
return SimpleStatus(SyncStatusNeedLocalUpdate), err
}

Expand All @@ -139,10 +161,7 @@ func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro
if rp.NeedSync() {
if !dry {
statefulSet := rp.server.buildStatefulSet()
if secret := rp.tlsSecret; secret != nil {
secret.AddVolume(&statefulSet.Spec.Template.Spec)
secret.AddVolumeMount(&statefulSet.Spec.Template.Spec.Containers[0])
}
rp.applyTlsSecret(&statefulSet.Spec.Template.Spec)
err = rp.server.Sync(ctx)
}
return WaitingStatus(SyncStatusPending, "components"), err
Expand Down
26 changes: 24 additions & 2 deletions pkg/components/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ type server interface {
resources.Syncable
podsManager
needUpdate() bool
needUpdateWithSpecCheck(specCheckFunc func() bool) bool
configNeedsReload() bool
needBuild() bool
needSync() bool
buildStatefulSet() *appsv1.StatefulSet
rebuildStatefulSet() *appsv1.StatefulSet
getStatefulSet() *resources.StatefulSet
addCABundleMount(c *corev1.Container)
addTlsSecretMount(c *corev1.Container)
addMonitoringPort(port corev1.ServicePort)
Expand Down Expand Up @@ -276,7 +278,9 @@ func (s *serverImpl) podsImageCorrespondsToSpec() bool {
return found == len(s.sidecarImages)
}

func (s *serverImpl) needUpdate() bool {
// needUpdateWithSpecCheck provides the common update checking logic.
// It accepts a spec checking function to allow component-specific spec comparisons.
func (s *serverImpl) needUpdateWithSpecCheck(specCheckFunc func() bool) bool {
if !s.exists() {
return false
}
Expand All @@ -289,7 +293,21 @@ func (s *serverImpl) needUpdate() bool {
if err != nil {
return false
}
return needReload
if needReload {
return true
}

return specCheckFunc()
}

func (s *serverImpl) needUpdate() bool {
return s.needUpdateWithSpecCheck(s.needStatefulSetSpecUpdate)
}

// needStatefulSetSpecUpdate checks if the StatefulSet spec has changed
func (s *serverImpl) needStatefulSetSpecUpdate() bool {
desiredSpec := s.rebuildStatefulSet().Spec
return s.statefulSet.SpecChanged(desiredSpec)
}

func (s *serverImpl) arePodsReady(ctx context.Context) bool {
Expand Down Expand Up @@ -430,6 +448,10 @@ func (s *serverImpl) rebuildStatefulSet() *appsv1.StatefulSet {
return statefulSet
}

func (s *serverImpl) getStatefulSet() *resources.StatefulSet {
return s.statefulSet
}

func (s *serverImpl) removePods(ctx context.Context) error {
ss := s.rebuildStatefulSet()
ss.Spec.Replicas = ptr.To(int32(0))
Expand Down
9 changes: 9 additions & 0 deletions pkg/components/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/labeller"
mock_yt "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/mock"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/resources"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/ypatch"
)

Expand Down Expand Up @@ -137,6 +138,14 @@ func (fs *FakeServer) buildStatefulSet() *appsv1.StatefulSet {
return nil
}

func (fs *FakeServer) getStatefulSet() *resources.StatefulSet {
return nil
}

func (fs *FakeServer) needUpdateWithSpecCheck(specCheckFunc func() bool) bool {
return false
}

func (fs *FakeServer) rebuildStatefulSet() *appsv1.StatefulSet {
return nil
}
Expand Down
41 changes: 29 additions & 12 deletions pkg/components/yql_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,37 @@ func (yqla *YqlAgent) createUpdateScript() string {
return strings.Join(script, "\n")
}

// applyYqlAgentContainerModifications applies YqlAgent-specific modifications to the container
func (yqla *YqlAgent) applyYqlAgentContainerModifications(container *corev1.Container) {
container.Command = []string{"sh", "-c", fmt.Sprintf("echo -n $YT_TOKEN > %s; %s", consts.DefaultYqlTokenPath, strings.Join(container.Command, " "))}
container.EnvFrom = []corev1.EnvFromSource{yqla.secret.GetEnvSource()}
if yqla.ytsaurus.GetResource().Spec.UseIPv6 && !yqla.ytsaurus.GetResource().Spec.UseIPv4 {
container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "1"}}
} else if !yqla.ytsaurus.GetResource().Spec.UseIPv6 && yqla.ytsaurus.GetResource().Spec.UseIPv4 {
container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "1"}, {Name: "YT_FORCE_IPV6", Value: "0"}}
} else {
container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "0"}}
}
container.Env = append(container.Env, getDefaultEnv()...)
}

// needUpdate checks if YqlAgent needs an update, applying YqlAgent-specific modifications
func (yqla *YqlAgent) needUpdate() bool {
return yqla.server.needUpdateWithSpecCheck(yqla.needStatefulSetSpecUpdate)
}

// needStatefulSetSpecUpdate this applies YqlAgent-specific modifications before comparison
func (yqla *YqlAgent) needStatefulSetSpecUpdate() bool {
ss := yqla.server.rebuildStatefulSet()
container := &ss.Spec.Template.Spec.Containers[0]
yqla.applyYqlAgentContainerModifications(container)
return yqla.server.getStatefulSet().SpecChanged(ss.Spec)
}

func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, error) {
var err error

if ytv1.IsReadyToUpdateClusterState(yqla.ytsaurus.GetClusterState()) && yqla.server.needUpdate() {
if ytv1.IsReadyToUpdateClusterState(yqla.ytsaurus.GetClusterState()) && yqla.needUpdate() {
return SimpleStatus(SyncStatusNeedLocalUpdate), err
}

Expand Down Expand Up @@ -194,17 +221,7 @@ func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er
if !dry {
ss := yqla.server.buildStatefulSet()
container := &ss.Spec.Template.Spec.Containers[0]
container.Command = []string{"sh", "-c", fmt.Sprintf("echo -n $YT_TOKEN > %s; %s", consts.DefaultYqlTokenPath, strings.Join(container.Command, " "))}
container.EnvFrom = []corev1.EnvFromSource{yqla.secret.GetEnvSource()}
if yqla.ytsaurus.GetResource().Spec.UseIPv6 && !yqla.ytsaurus.GetResource().Spec.UseIPv4 {
container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "1"}}
} else if !yqla.ytsaurus.GetResource().Spec.UseIPv6 && yqla.ytsaurus.GetResource().Spec.UseIPv4 {
container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "1"}, {Name: "YT_FORCE_IPV6", Value: "0"}}
} else {
container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "0"}}
}

container.Env = append(container.Env, getDefaultEnv()...)
yqla.applyYqlAgentContainerModifications(container)
err = yqla.server.Sync(ctx)
}
return WaitingStatus(SyncStatusPending, "components"), err
Expand Down
Loading
Loading