diff --git a/.github/workflows/subflow_run_e2e_tests.yaml b/.github/workflows/subflow_run_e2e_tests.yaml index 563759f91..42166ab1b 100644 --- a/.github/workflows/subflow_run_e2e_tests.yaml +++ b/.github/workflows/subflow_run_e2e_tests.yaml @@ -23,13 +23,15 @@ jobs: strategy: fail-fast: false - max-parallel: 5 + max-parallel: 6 matrix: e2e: - name: "update basic" filter: "update && basic" - name: "update plan" - filter: "update && plan" + filter: "update && plan && !strategy" + - name: "update strategy" + filter: "update && plan && strategy" - name: "update other" filter: "update && !(basic || plan)" - name: "tls" diff --git a/controllers/component_manager.go b/controllers/component_manager.go index 026b2990b..2e6e341b5 100644 --- a/controllers/component_manager.go +++ b/controllers/component_manager.go @@ -49,13 +49,13 @@ func NewComponentManager( return allComponents } - d := components.NewDiscovery(cfgen, ytsaurus) m := components.NewMaster(cfgen, ytsaurus) var hps []components.Component for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies { hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec)) } yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0], getAllComponents) + d := components.NewDiscovery(cfgen, ytsaurus, yc) var dnds []components.Component if len(resource.Spec.DataNodes) > 0 { @@ -120,7 +120,7 @@ func NewComponentManager( } if resource.Spec.ControllerAgents != nil { - ca := components.NewControllerAgent(cfgen, ytsaurus, m) + ca := components.NewControllerAgent(cfgen, ytsaurus, m, yc) allComponents = append(allComponents, ca) } @@ -147,7 +147,7 @@ func NewComponentManager( } if resource.Spec.MasterCaches != nil { - mc := components.NewMasterCache(cfgen, ytsaurus) + mc := components.NewMasterCache(cfgen, ytsaurus, yc) allComponents = append(allComponents, mc) } diff --git a/pkg/components/controller_agent.go b/pkg/components/controller_agent.go index 47003e516..b691868a1 100644 --- a/pkg/components/controller_agent.go +++ b/pkg/components/controller_agent.go @@ -2,7 +2,10 @@ package components import ( "context" + "fmt" + "go.ytsaurus.tech/yt/go/ypath" + "go.ytsaurus.tech/yt/go/yt" corev1 "k8s.io/api/core/v1" ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1" @@ -14,11 +17,12 @@ import ( type ControllerAgent struct { localServerComponent - cfgen *ytconfig.Generator - master Component + cfgen *ytconfig.Generator + master Component + ytsaurusClient internalYtsaurusClient } -func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *ControllerAgent { +func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component, yc internalYtsaurusClient) *ControllerAgent { l := cfgen.GetComponentLabeller(consts.ControllerAgentType, "") resource := ytsaurus.GetResource() @@ -41,6 +45,7 @@ func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, localServerComponent: newLocalServerComponent(l, ytsaurus, srv), cfgen: cfgen, master: master, + ytsaurusClient: yc, } } @@ -56,8 +61,23 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu } if ca.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil { - return *status, err + if IsUpdatingComponent(ca.ytsaurus, ca) { + switch getComponentUpdateStrategy(ca.ytsaurus, consts.ControllerAgentType, ca.GetShortName()) { + case ytv1.ComponentUpdateModeTypeOnDelete: + if status, err := handleOnDeleteUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil { + return *status, err + } + default: + if status, err := handleBulkUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil { + return *status, err + } + } + + if ca.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation { + return ComponentStatusReady(), err + } + } else { + return ComponentStatusReadyAfter("Not updating component"), nil } } @@ -91,3 +111,59 @@ func (ca *ControllerAgent) Sync(ctx context.Context) error { _, err := ca.doSync(ctx, false) return err } + +type ControllerAgentsWithMaintenance struct { + Address string `yson:",value"` + Maintenance bool `yson:"maintenance,attr"` + Alerts []string `yson:"alerts,attr"` +} + +func (ca *ControllerAgent) UpdatePreCheck(ctx context.Context) ComponentStatus { + if ca.ytsaurusClient == nil { + return ComponentStatusBlocked("YtsaurusClient component is not available") + } + + ytClient := ca.ytsaurusClient.GetYtClient() + if ytClient == nil { + return ComponentStatusBlocked("YT client is not available") + } + + // Check that the number of instances in YT matches the expected instanceCount + expectedCount := int(ca.ytsaurus.GetResource().Spec.ControllerAgents.InstanceCount) + if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.ControllerAgentType, expectedCount); err != nil { + return ComponentStatusBlocked(err.Error()) + } + + controllerAgentsWithMaintenance := make([]ControllerAgentsWithMaintenance, 0) + cypressPath := consts.ComponentCypressPath(consts.ControllerAgentType) + + err := ca.ytsaurusClient.GetYtClient().ListNode(ctx, ypath.Path(cypressPath), &controllerAgentsWithMaintenance, &yt.ListNodeOptions{ + Attributes: []string{"maintenance", "alerts"}}) + if err != nil { + return ComponentStatusBlocked(err.Error()) + } + + for _, controllerAgent := range controllerAgentsWithMaintenance { + var connected bool + err = ca.ytsaurusClient.GetYtClient().GetNode( + ctx, + ypath.Path(fmt.Sprintf("%v/%v/orchid/controller_agent/service/connected", cypressPath, controllerAgent.Address)), + &connected, + nil) + if err != nil { + return ComponentStatusBlocked(err.Error()) + } + + if !connected { + msg := fmt.Sprintf("Controller agent is not connected: %v", controllerAgent.Address) + return ComponentStatusBlocked(msg) + } + + if controllerAgent.Maintenance { + msg := fmt.Sprintf("There is a controller agent in maintenance: %v", controllerAgent.Address) + return ComponentStatusBlocked(msg) + } + } + + return ComponentStatusReady() +} diff --git a/pkg/components/discovery.go b/pkg/components/discovery.go index b447883d1..4d3371803 100644 --- a/pkg/components/discovery.go +++ b/pkg/components/discovery.go @@ -2,6 +2,7 @@ package components import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" @@ -10,14 +11,16 @@ import ( "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts" "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/resources" "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/ytconfig" + "go.ytsaurus.tech/yt/go/ypath" ) type Discovery struct { localServerComponent - cfgen *ytconfig.Generator + cfgen *ytconfig.Generator + ytsaurusClient internalYtsaurusClient } -func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Discovery { +func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, yc internalYtsaurusClient) *Discovery { l := cfgen.GetComponentLabeller(consts.DiscoveryType, "") resource := ytsaurus.GetResource() @@ -41,6 +44,7 @@ func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Disco return &Discovery{ localServerComponent: newLocalServerComponent(l, ytsaurus, srv), cfgen: cfgen, + ytsaurusClient: yc, } } @@ -56,8 +60,23 @@ func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, erro } if d.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil { - return *status, err + if IsUpdatingComponent(d.ytsaurus, d) { + switch getComponentUpdateStrategy(d.ytsaurus, consts.DiscoveryType, d.GetShortName()) { + case ytv1.ComponentUpdateModeTypeOnDelete: + if status, err := handleOnDeleteUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil { + return *status, err + } + default: + if status, err := handleBulkUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil { + return *status, err + } + } + + if d.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation { + return ComponentStatusReady(), err + } + } else { + return ComponentStatusReadyAfter("Not updating component"), nil } } @@ -83,3 +102,39 @@ func (d *Discovery) Sync(ctx context.Context) error { _, err := d.doSync(ctx, false) return err } + +func (d *Discovery) UpdatePreCheck(ctx context.Context) ComponentStatus { + if d.ytsaurusClient == nil { + return ComponentStatusBlocked("YtsaurusClient component is not available") + } + + ytClient := d.ytsaurusClient.GetYtClient() + if ytClient == nil { + return ComponentStatusBlocked("YT client is not available") + } + + // Check that the number of instances in YT matches the expected instanceCount + expectedCount := int(d.ytsaurus.GetResource().Spec.Discovery.InstanceCount) + if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.DiscoveryType, expectedCount); err != nil { + return ComponentStatusBlocked(err.Error()) + } + + var discoveryServers []string + cypressPath := consts.ComponentCypressPath(consts.DiscoveryType) + + err := d.ytsaurusClient.GetYtClient().ListNode(ctx, ypath.Path(cypressPath), &discoveryServers, nil) + if err != nil { + return ComponentStatusBlocked(err.Error()) + } + + // Check that all discovery_servers are alive + for _, server := range discoveryServers { + orchidPath := ypath.Path(fmt.Sprintf("%s/%s/orchid", cypressPath, server)) + var orchidData map[string]interface{} + if err := ytClient.GetNode(ctx, orchidPath, &orchidData, nil); err != nil { + return ComponentStatusBlocked(fmt.Sprintf("Failed to get discovery orchid data for %s: %v", server, err)) + } + } + + return ComponentStatusReady() +} diff --git a/pkg/components/helpers.go b/pkg/components/helpers.go index ac3a442f7..f918670ea 100644 --- a/pkg/components/helpers.go +++ b/pkg/components/helpers.go @@ -199,55 +199,55 @@ func handleBulkUpdatingClusterState( ) (*ComponentStatus, error) { var err error - if ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsRemoval { - // Not in the pod removal phase, let the component handle other update states - return nil, err - } + switch ytsaurus.GetUpdateState() { + case ytv1.UpdateStateWaitingForPodsRemoval: + // Check if this component is using the new update mode + if !doesComponentUseNewUpdateMode(ytsaurus, cmp.GetType(), cmp.GetFullName()) { + if !dry { + err = removePods(ctx, server, cmpBase) + } + return ptr.To(ComponentStatusUpdateStep("pods removal")), err + } + ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{ + Type: fmt.Sprintf("%s%s", cmp.GetFullName(), consts.ConditionBulkUpdateModeStarted), + Status: metav1.ConditionTrue, + Reason: "BulkUpdateModeStarted", + Message: "bulk update mode started", + }) + + // Run pre-checks if needed + if ytsaurus.ShouldRunPreChecks(cmp.GetType(), cmp.GetFullName()) { + if status, err := runPrechecks(ctx, ytsaurus, cmp); status != nil { + return status, err + } + } - // Check if this component is using the new update mode - if !doesComponentUseNewUpdateMode(ytsaurus, cmp.GetType(), cmp.GetFullName()) { + // Remove pods if !dry { + ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{ + Type: cmp.GetLabeller().GetScalingDownCondition(), + Status: metav1.ConditionTrue, + Reason: "RemovingPods", + Message: "removing pods", + }) err = removePods(ctx, server, cmpBase) } return ptr.To(ComponentStatusUpdateStep("pods removal")), err - } - - ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{ - Type: fmt.Sprintf("%s%s", cmp.GetFullName(), consts.ConditionBulkUpdateModeStarted), - Status: metav1.ConditionTrue, - Reason: "BulkUpdateModeStarted", - Message: "bulk update mode started", - }) - // Run pre-checks if needed - if ytsaurus.ShouldRunPreChecks(cmp.GetType(), cmp.GetFullName()) { - if status, err := runPrechecks(ctx, ytsaurus, cmp); status != nil { - return status, err - } - } - - // Remove pods - if !dry { - ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{ - Type: cmp.GetLabeller().GetScalingDownCondition(), - Status: metav1.ConditionTrue, - Reason: "RemovingPods", - Message: "removing pods", - }) - err = removePods(ctx, server, cmpBase) - } - - // Update condition if state has progressed to pod creation - if ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForPodsCreation { + case ytv1.UpdateStateWaitingForPodsCreation: ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{ Type: cmp.GetLabeller().GetScalingUpCondition(), Status: metav1.ConditionTrue, Reason: "CreatingPods", Message: "creating new pods", }) - } + // Let the component handle its own post-removal / pod-creation logic. + return nil, err - return ptr.To(ComponentStatusUpdateStep("pods removal")), err + default: + // Not in the pod removal phase, let the component handle other update states + return nil, err + } } // handleOnDeleteUpdatingClusterState handles the OnDelete mode where pods must be manually deleted by the user. @@ -421,7 +421,8 @@ func doesComponentUseNewUpdateMode(ytsaurus *apiproxy.Ytsaurus, componentType co func getComponentUpdateStrategy(ytsaurus *apiproxy.Ytsaurus, componentType consts.ComponentType, componentName string) ytv1.ComponentUpdateModeType { for _, selector := range ytsaurus.GetResource().Spec.UpdatePlan { if selector.Component.Type == componentType && - (selector.Component.Name == "" || selector.Component.Name == componentName) { + (selector.Component.Name == "" || selector.Component.Name == componentName) && + selector.Strategy != nil { return selector.Strategy.Type() } } diff --git a/pkg/components/httpproxy.go b/pkg/components/httpproxy.go index 2a134bdbb..18b850875 100644 --- a/pkg/components/httpproxy.go +++ b/pkg/components/httpproxy.go @@ -133,8 +133,23 @@ func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, err } if hp.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, hp.ytsaurus, hp, &hp.localComponent, hp.server, dry); status != nil { - return *status, err + if IsUpdatingComponent(hp.ytsaurus, hp) { + switch getComponentUpdateStrategy(hp.ytsaurus, consts.HttpProxyType, hp.GetShortName()) { + case ytv1.ComponentUpdateModeTypeOnDelete: + if status, err := handleOnDeleteUpdatingClusterState(ctx, hp.ytsaurus, hp, &hp.localComponent, hp.server, dry); status != nil { + return *status, err + } + default: + if status, err := handleBulkUpdatingClusterState(ctx, hp.ytsaurus, hp, &hp.localComponent, hp.server, dry); status != nil { + return *status, err + } + } + + if hp.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation { + return ComponentStatusReady(), err + } + } else { + return ComponentStatusReadyAfter("Not updating component"), nil } } diff --git a/pkg/components/master.go b/pkg/components/master.go index 00126f4f6..ee587d763 100644 --- a/pkg/components/master.go +++ b/pkg/components/master.go @@ -420,8 +420,23 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error) if m.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForSidecarsInitializingPrepare || m.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForSidecarsInitialize { return m.sidecarsInit(ctx, dry) } - if status, err := handleUpdatingClusterState(ctx, m.ytsaurus, m, &m.localComponent, m.server, dry); status != nil { - return *status, err + if IsUpdatingComponent(m.ytsaurus, m) { + switch getComponentUpdateStrategy(m.ytsaurus, consts.MasterType, m.GetShortName()) { + case ytv1.ComponentUpdateModeTypeOnDelete: + if status, err := handleOnDeleteUpdatingClusterState(ctx, m.ytsaurus, m, &m.localComponent, m.server, dry); status != nil { + return *status, err + } + default: + if status, err := handleBulkUpdatingClusterState(ctx, m.ytsaurus, m, &m.localComponent, m.server, dry); status != nil { + return *status, err + } + } + + if m.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation { + return ComponentStatusReady(), err + } + } else { + return ComponentStatusReadyAfter("Not updating component"), nil } } diff --git a/pkg/components/master_caches.go b/pkg/components/master_caches.go index 9361bac00..5f5b33d40 100644 --- a/pkg/components/master_caches.go +++ b/pkg/components/master_caches.go @@ -14,10 +14,11 @@ import ( type MasterCache struct { localServerComponent - cfgen *ytconfig.Generator + cfgen *ytconfig.Generator + ytsaurusClient internalYtsaurusClient } -func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *MasterCache { +func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, yc internalYtsaurusClient) *MasterCache { l := cfgen.GetComponentLabeller(consts.MasterCacheType, "") resource := ytsaurus.GetResource() @@ -39,6 +40,7 @@ func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Mas return &MasterCache{ localServerComponent: newLocalServerComponent(l, ytsaurus, srv), cfgen: cfgen, + ytsaurusClient: yc, } } @@ -54,8 +56,23 @@ func (mc *MasterCache) doSync(ctx context.Context, dry bool) (ComponentStatus, e } if mc.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, mc.ytsaurus, mc, &mc.localComponent, mc.server, dry); status != nil { - return *status, err + if IsUpdatingComponent(mc.ytsaurus, mc) { + switch getComponentUpdateStrategy(mc.ytsaurus, consts.MasterCacheType, mc.GetShortName()) { + case ytv1.ComponentUpdateModeTypeOnDelete: + if status, err := handleOnDeleteUpdatingClusterState(ctx, mc.ytsaurus, mc, &mc.localComponent, mc.server, dry); status != nil { + return *status, err + } + default: + if status, err := handleBulkUpdatingClusterState(ctx, mc.ytsaurus, mc, &mc.localComponent, mc.server, dry); status != nil { + return *status, err + } + } + + if mc.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation { + return ComponentStatusReady(), err + } + } else { + return ComponentStatusReadyAfter("Not updating component"), nil } } @@ -99,3 +116,22 @@ func (mc *MasterCache) getHostAddressLabel() string { } return defaultHostAddressLabel } + +func (mc *MasterCache) UpdatePreCheck(ctx context.Context) ComponentStatus { + if mc.ytsaurusClient == nil { + return ComponentStatusBlocked("YtsaurusClient component is not available") + } + + ytClient := mc.ytsaurusClient.GetYtClient() + if ytClient == nil { + return ComponentStatusBlocked("YT client is not available") + } + + // Check that the number of instances in YT matches the expected instanceCount + expectedCount := int(mc.ytsaurus.GetResource().Spec.MasterCaches.InstanceCount) + if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.MasterCacheType, expectedCount); err != nil { + return ComponentStatusBlocked(err.Error()) + } + + return ComponentStatusReady() +} diff --git a/pkg/components/rpcproxy.go b/pkg/components/rpcproxy.go index adb524617..83aca2b1d 100644 --- a/pkg/components/rpcproxy.go +++ b/pkg/components/rpcproxy.go @@ -123,8 +123,23 @@ func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro } if rp.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, rp.ytsaurus, rp, &rp.localComponent, rp.server, dry); status != nil { - return *status, err + if IsUpdatingComponent(rp.ytsaurus, rp) { + switch getComponentUpdateStrategy(rp.ytsaurus, consts.RpcProxyType, rp.GetShortName()) { + case ytv1.ComponentUpdateModeTypeOnDelete: + if status, err := handleOnDeleteUpdatingClusterState(ctx, rp.ytsaurus, rp, &rp.localComponent, rp.server, dry); status != nil { + return *status, err + } + default: + if status, err := handleBulkUpdatingClusterState(ctx, rp.ytsaurus, rp, &rp.localComponent, rp.server, dry); status != nil { + return *status, err + } + } + + if rp.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation { + return ComponentStatusReady(), err + } + } else { + return ComponentStatusReadyAfter("Not updating component"), nil } } diff --git a/pkg/components/ytsaurus_client.go b/pkg/components/ytsaurus_client.go index f2f2d077d..19824d93d 100644 --- a/pkg/components/ytsaurus_client.go +++ b/pkg/components/ytsaurus_client.go @@ -25,6 +25,10 @@ import ( "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/ytconfig" ) +const ( + hydraPath = "orchid/monitoring/hydra" +) + type internalYtsaurusClient interface { Component GetYtClient() yt.Client @@ -523,42 +527,12 @@ func (yc *YtsaurusClient) HandlePossibilityCheck(ctx context.Context) (ok bool, return false, msg, nil } - // Check masters. - primaryMasterAddresses := make([]string, 0) - err = yc.ytClient.ListNode(ctx, ypath.Path("//sys/primary_masters"), &primaryMasterAddresses, nil) + // Check is masters quorum healthy + msg, err = yc.checkMastersQuorumHealth(ctx) if err != nil { return false, "", err } - - leadingPrimaryMasterCount := 0 - followingPrimaryMasterCount := 0 - - for _, primaryMasterAddress := range primaryMasterAddresses { - var hydra MasterHydra - err = yc.ytClient.GetNode( - ctx, - ypath.Path(fmt.Sprintf("//sys/primary_masters/%v/orchid/monitoring/hydra", primaryMasterAddress)), - &hydra, - nil) - if err != nil { - return false, "", err - } - - if !hydra.Active { - msg = fmt.Sprintf("There is a non-active master: %v", primaryMasterAddresses) - return false, msg, nil - } - - switch hydra.State { - case MasterStateLeading: - leadingPrimaryMasterCount += 1 - case MasterStateFollowing: - followingPrimaryMasterCount += 1 - } - } - - if !(leadingPrimaryMasterCount == 1 && followingPrimaryMasterCount+1 == len(primaryMasterAddresses)) { - msg = "There is no leader or some peer is not active" + if msg != "" { return false, msg, nil } @@ -842,6 +816,61 @@ func (yc *YtsaurusClient) RecoverTableCells(ctx context.Context, bundles []ytv1. // Master actions. +type MastersWithMaintenance struct { + Address string `yson:",value"` + Maintenance bool `yson:"maintenance,attr"` +} + +func (yc *YtsaurusClient) checkMastersQuorumHealth(ctx context.Context) (string, error) { + primaryMastersWithMaintenance := make([]MastersWithMaintenance, 0) + cypressPath := consts.ComponentCypressPath(consts.MasterType) + + err := yc.ytClient.ListNode(ctx, ypath.Path(cypressPath), &primaryMastersWithMaintenance, &yt.ListNodeOptions{ + Attributes: []string{"maintenance"}}) + if err != nil { + return "", err + } + + leadingPrimaryMasterCount := 0 + followingPrimaryMasterCount := 0 + + for _, primaryMaster := range primaryMastersWithMaintenance { + var hydra MasterHydra + err = yc.ytClient.GetNode( + ctx, + ypath.Path(fmt.Sprintf("%v/%v/%v", cypressPath, primaryMaster.Address, hydraPath)), + &hydra, + nil) + if err != nil { + return "", err + } + + if !hydra.Active { + msg := fmt.Sprintf("There is a non-active master: %v", primaryMaster.Address) + return msg, nil + } + + if primaryMaster.Maintenance { + msg := fmt.Sprintf("There is a master in maintenance: %v", primaryMaster.Address) + return msg, nil + } + + switch hydra.State { + case MasterStateLeading: + leadingPrimaryMasterCount += 1 + case MasterStateFollowing: + followingPrimaryMasterCount += 1 + } + } + + if !(leadingPrimaryMasterCount == 1 && followingPrimaryMasterCount+1 == len(primaryMastersWithMaintenance)) { + msg := fmt.Sprintf("Quorum health check failed: leading=%d, following=%d, total=%d", + leadingPrimaryMasterCount, followingPrimaryMasterCount, len(primaryMastersWithMaintenance)) + return msg, nil + } + return "", nil +} + func (yc *YtsaurusClient) GetMasterMonitoringPaths(ctx context.Context) ([]string, error) { var monitoringPaths []string mastersInfo, err := yc.getAllMasters(ctx) @@ -851,7 +880,7 @@ func (yc *YtsaurusClient) GetMasterMonitoringPaths(ctx context.Context) ([]strin for _, masterInfo := range mastersInfo { for _, address := range masterInfo.Addresses { - monitoringPath := fmt.Sprintf("//sys/cluster_masters/%s/orchid/monitoring/hydra", address) + monitoringPath := fmt.Sprintf("//sys/cluster_masters/%s/%v", address, hydraPath) monitoringPaths = append(monitoringPaths, monitoringPath) } } @@ -917,7 +946,7 @@ type DataNodeMeta struct { func (yc *YtsaurusClient) getDataNodesInfo(ctx context.Context) ([]DataNodeMeta, error) { var dndMeta []DataNodeMeta - err := yc.ytClient.ListNode(ctx, ypath.Path("//sys/data_nodes"), &dndMeta, &yt.ListNodeOptions{ + err := yc.ytClient.ListNode(ctx, ypath.Path(consts.ComponentCypressPath(consts.DataNodeType)), &dndMeta, &yt.ListNodeOptions{ Attributes: []string{ "use_imaginary_chunk_locations", "state", diff --git a/pkg/testutil/spec_builders.go b/pkg/testutil/spec_builders.go index 3c7fa4d17..4b22d9771 100644 --- a/pkg/testutil/spec_builders.go +++ b/pkg/testutil/spec_builders.go @@ -484,8 +484,9 @@ func (b *YtsaurusBuilder) WithRPCProxies() { } func (b *YtsaurusBuilder) CreateHTTPProxiesSpec() ytv1.HTTPProxiesSpec { + stype := corev1.ServiceTypeNodePort return ytv1.HTTPProxiesSpec{ - ServiceType: "NodePort", + ServiceType: stype, InstanceSpec: ytv1.InstanceSpec{ InstanceCount: 1, MinReadyInstanceCount: b.MinReadyInstanceCount, diff --git a/test/e2e/helpers_test.go b/test/e2e/helpers_test.go index a407752de..33a092ad0 100644 --- a/test/e2e/helpers_test.go +++ b/test/e2e/helpers_test.go @@ -29,6 +29,7 @@ import ( ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1" "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts" + "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/testutil" "go.ytsaurus.tech/yt/go/guid" "go.ytsaurus.tech/yt/go/ypath" @@ -633,3 +634,54 @@ func restartPod(ctx context.Context, namespace, name string) { HaveField("Status.Phase", Equal(corev1.PodRunning)), )) } + +func currentImageForComponent(componentType consts.ComponentType) string { + if componentType == consts.QueryTrackerType { + return testutil.QueryTrackerImageCurrent + } + return testutil.YtsaurusImageCurrent +} + +func setComponentImage(ytsaurus *ytv1.Ytsaurus, componentType consts.ComponentType, image string) { + switch componentType { + case consts.QueryTrackerType: + ytsaurus.Spec.QueryTrackers.Image = ptr.To(image) + case consts.MasterType: + ytsaurus.Spec.CoreImage = image + case consts.DiscoveryType: + ytsaurus.Spec.Discovery.Image = ptr.To(image) + case consts.MasterCacheType: + ytsaurus.Spec.MasterCaches.Image = ptr.To(image) + case consts.ControllerAgentType: + ytsaurus.Spec.ControllerAgents.Image = ptr.To(image) + case consts.RpcProxyType: + ytsaurus.Spec.RPCProxies[0].Image = ptr.To(image) + case consts.HttpProxyType: + ytsaurus.Spec.HTTPProxies[0].Image = ptr.To(image) + case consts.SchedulerType: + ytsaurus.Spec.Schedulers.Image = ptr.To(image) + } +} + +func getComponentImage(ytsaurus *ytv1.Ytsaurus, componentType consts.ComponentType) string { + switch componentType { + case consts.QueryTrackerType: + return *ytsaurus.Spec.QueryTrackers.Image + case consts.MasterType: + return ytsaurus.Spec.CoreImage + case consts.DiscoveryType: + return *ytsaurus.Spec.Discovery.Image + case consts.MasterCacheType: + return *ytsaurus.Spec.MasterCaches.Image + case consts.ControllerAgentType: + return *ytsaurus.Spec.ControllerAgents.Image + case consts.RpcProxyType: + return *ytsaurus.Spec.RPCProxies[0].Image + case consts.HttpProxyType: + return *ytsaurus.Spec.HTTPProxies[0].Image + case consts.SchedulerType: + return *ytsaurus.Spec.Schedulers.Image + default: + return "unsupported component type" + } +} diff --git a/test/e2e/ytsaurus_controller_test.go b/test/e2e/ytsaurus_controller_test.go index c71ca8923..f2df0c3ed 100644 --- a/test/e2e/ytsaurus_controller_test.go +++ b/test/e2e/ytsaurus_controller_test.go @@ -358,7 +358,7 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func() } ytClient := getYtClient(httpClient, proxyPort+"://"+proxyAddress) // NOTE: WhoAmI right now does not retry. - if _, err := ytClient.WhoAmI(ctx, nil); err != nil { + if _, err := ytClient.WhoAmI(specCtx, nil); err != nil { return fmt.Sprintf("ytsaurus api error: %v", err) } var clusterHealth ClusterHealthReport @@ -1894,132 +1894,184 @@ exec "$@"` }) // integration - Context("Test update plan strategy", Label("update", "plan", "strategy"), func() { - BeforeEach(func() { - By("Adding base components") - ytBuilder.WithBaseComponents() - ytBuilder.WithQueryTracker() - ytsaurus.Spec.QueryTrackers = &ytv1.QueryTrackerSpec{ - InstanceSpec: ytv1.InstanceSpec{ - Image: ptr.To(testutil.QueryTrackerImagePrevious), - InstanceCount: 3, - }, - } - ytsaurus.Spec.UpdatePlan = []ytv1.ComponentUpdateSelector{ - { - Component: ytv1.Component{Type: consts.QueryTrackerType}, - Strategy: &ytv1.ComponentUpdateStrategy{ - RunPreChecks: ptr.To(true), - }, - }, - } - }) - - It("Should update query tracker by bulk strategy and have Running state", Label("query-tracker", "bulk"), func(ctx context.Context) { - - podsBefore := getComponentPods(ctx, namespace) - - By("Trigger QT update") - ytsaurus.Spec.QueryTrackers.Image = ptr.To(testutil.QueryTrackerImageFuture) + Context("update plan strategy testing", Label("update", "plan", "strategy"), func() { + DescribeTableSubtree("bulk strategy", Label("bulk"), + func(componentType consts.ComponentType, stsName string) { + BeforeEach(func() { + ytBuilder.WithBaseComponents() + + switch componentType { + case consts.QueryTrackerType: + ytBuilder.WithQueryTracker() + ytsaurus.Spec.QueryTrackers = &ytv1.QueryTrackerSpec{ + InstanceSpec: ytv1.InstanceSpec{ + Image: ptr.To(testutil.QueryTrackerImagePrevious), + InstanceCount: 3, + }, + } + case consts.MasterType: + ytsaurus.Spec.CoreImage = testutil.YtsaurusImagePrevious + ytsaurus.Spec.PrimaryMasters.InstanceCount = 3 + case consts.MasterCacheType: + ytsaurus.Spec.MasterCaches = &ytv1.MasterCachesSpec{ + InstanceSpec: ytv1.InstanceSpec{ + Image: ptr.To(testutil.YtsaurusImagePrevious), + InstanceCount: 3, + }, + } + case consts.DiscoveryType: + ytsaurus.Spec.Discovery.Image = ptr.To(testutil.YtsaurusImagePrevious) + ytsaurus.Spec.Discovery.InstanceCount = 3 + case consts.ControllerAgentType: + ytsaurus.Spec.ControllerAgents.Image = ptr.To(testutil.YtsaurusImagePrevious) + ytsaurus.Spec.ControllerAgents.InstanceCount = 3 + case consts.RpcProxyType: + ytBuilder.WithRPCProxies() + ytsaurus.Spec.RPCProxies[0].Image = ptr.To(testutil.YtsaurusImagePrevious) + ytsaurus.Spec.RPCProxies[0].InstanceCount = 3 + case consts.HttpProxyType: + ytsaurus.Spec.HTTPProxies[0].Image = ptr.To(testutil.YtsaurusImagePrevious) + ytsaurus.Spec.HTTPProxies[0].InstanceCount = 3 + } - UpdateObject(ctx, ytsaurus) - EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveObservedGeneration()) + ytsaurus.Spec.UpdatePlan = []ytv1.ComponentUpdateSelector{ + { + Component: ytv1.Component{Type: componentType}, + Strategy: &ytv1.ComponentUpdateStrategy{ + RunPreChecks: ptr.To(true), + }, + }, + } + }) + It("Should update "+stsName+" in bulkUpdate mode and have Running state", func(ctx context.Context) { - By("Waiting cluster update completes") - EventuallyYtsaurus(ctx, ytsaurus, upgradeTimeout).Should(HaveClusterStateRunning()) + By("Trigger " + stsName + " update") + setComponentImage(ytsaurus, componentType, currentImageForComponent(componentType)) - podsAfter := getComponentPods(ctx, namespace) - pods := getChangedPods(podsBefore, podsAfter) + UpdateObject(ctx, ytsaurus) + EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveObservedGeneration()) - Expect(pods.Updated).To(ConsistOf("qt-0", "qt-1", "qt-2")) + By("Waiting cluster update completes") + EventuallyYtsaurus(ctx, ytsaurus, upgradeTimeout).Should(HaveClusterStateRunning()) - for name, pod := range podsAfter { - if strings.HasPrefix(name, "qt-") { - Expect(pod.Spec.Containers[0].Image).To(Equal(*ytsaurus.Spec.QueryTrackers.Image)) - } - } - }) - }) + podsAfter := getComponentPods(ctx, namespace) + expectedImage := getComponentImage(ytsaurus, componentType) - Context("on-delete strategy for schedulers", Label("update", "plan", "strategy", "scheduler", "ondelete"), func() { - BeforeEach(func() { - ytBuilder.WithBaseComponents() - ytsaurus.Spec.Schedulers = &ytv1.SchedulersSpec{ - InstanceSpec: ytv1.InstanceSpec{ - Image: ptr.To(testutil.YtsaurusImagePrevious), - InstanceCount: 3, - }, - } - ytsaurus.Spec.UpdatePlan = []ytv1.ComponentUpdateSelector{ - { - Component: ytv1.Component{Type: consts.SchedulerType}, - Strategy: &ytv1.ComponentUpdateStrategy{ - OnDelete: &ytv1.ComponentOnDeleteUpdateMode{}, - RunPreChecks: ptr.To(true), - }, - }, - } - }) - It("Should update scheduler in onDelete mode and have Running state", func(ctx context.Context) { + for name, pod := range podsAfter { + if strings.HasPrefix(name, stsName+"-") { + Expect(pod.Spec.Containers[0].Image).To(Equal(expectedImage)) + } + } - podsBefore := getComponentPods(ctx, namespace) + checkClusterHealth(ctx, ytClient) + checkChunkLocations(ytClient) + }) + }, + Entry("update query tracker", Label(consts.GetStatefulSetPrefix(consts.QueryTrackerType)), consts.QueryTrackerType, consts.GetStatefulSetPrefix(consts.QueryTrackerType)), + Entry("update master", Label(consts.GetStatefulSetPrefix(consts.MasterType)), consts.MasterType, consts.GetStatefulSetPrefix(consts.MasterType)), + Entry("update controller agent", Label(consts.GetStatefulSetPrefix(consts.ControllerAgentType)), consts.ControllerAgentType, consts.GetStatefulSetPrefix(consts.ControllerAgentType)), + Entry("update discovery", Label(consts.GetStatefulSetPrefix(consts.DiscoveryType)), consts.DiscoveryType, consts.GetStatefulSetPrefix(consts.DiscoveryType)), + Entry("update master cache", Label(consts.GetStatefulSetPrefix(consts.MasterCacheType)), consts.MasterCacheType, consts.GetStatefulSetPrefix(consts.MasterCacheType)), + Entry("update rpc proxy", Label(consts.GetStatefulSetPrefix(consts.RpcProxyType)), consts.RpcProxyType, consts.GetStatefulSetPrefix(consts.RpcProxyType)), + Entry("update http proxy", Label(consts.GetStatefulSetPrefix(consts.HttpProxyType)), consts.HttpProxyType, consts.GetStatefulSetPrefix(consts.HttpProxyType)), + ) + DescribeTableSubtree("on-delete strategy", Label("ondelete"), + func(componentType consts.ComponentType, stsName string) { + BeforeEach(func() { + ytBuilder.WithBaseComponents() + switch componentType { + case consts.SchedulerType: + ytsaurus.Spec.Schedulers = &ytv1.SchedulersSpec{ + InstanceSpec: ytv1.InstanceSpec{ + Image: ptr.To(testutil.YtsaurusImagePrevious), + InstanceCount: 3, + }, + } + case consts.MasterType: + ytsaurus.Spec.CoreImage = testutil.YtsaurusImagePrevious + ytsaurus.Spec.PrimaryMasters.InstanceCount = 3 + case consts.RpcProxyType: + ytBuilder.WithRPCProxies() + ytsaurus.Spec.RPCProxies[0].Image = ptr.To(testutil.YtsaurusImagePrevious) + ytsaurus.Spec.RPCProxies[0].InstanceCount = 3 + case consts.HttpProxyType: + ytsaurus.Spec.HTTPProxies[0].Image = ptr.To(testutil.YtsaurusImagePrevious) + ytsaurus.Spec.HTTPProxies[0].InstanceCount = 3 + } + ytsaurus.Spec.UpdatePlan = []ytv1.ComponentUpdateSelector{ + { + Component: ytv1.Component{Type: componentType}, + Strategy: &ytv1.ComponentUpdateStrategy{ + OnDelete: &ytv1.ComponentOnDeleteUpdateMode{}, + RunPreChecks: ptr.To(true), + }, + }, + } + }) + It("should update "+stsName+" with OnDelete strategy and have cluster Running state", func(ctx context.Context) { + podsBefore := getComponentPods(ctx, namespace) - By("Trigger sch update") - ytsaurus.Spec.Schedulers.Image = ptr.To(testutil.YtsaurusImageCurrent) + By("Trigger " + stsName + " update") + setComponentImage(ytsaurus, componentType, currentImageForComponent(componentType)) - UpdateObject(ctx, ytsaurus) - EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveObservedGeneration()) + UpdateObject(ctx, ytsaurus) + EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should(HaveObservedGeneration()) - By("Verify OnDelete mode is activated") - EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should( - HaveClusterUpdateState(ytv1.UpdateStateWaitingForPodsRemoval), - ) + By("Verify OnDelete mode is activated") + EventuallyYtsaurus(ctx, ytsaurus, reactionTimeout).Should( + HaveClusterUpdateState(ytv1.UpdateStateWaitingForPodsRemoval), + ) - By("Verify StatefulSet has OnDelete update strategy") - sch := appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "sch"}} - EventuallyObject(ctx, &sch, reactionTimeout).Should(HaveField("Spec.UpdateStrategy.Type", appsv1.OnDeleteStatefulSetStrategyType)) - - By("Verify pods are NOT updated after 1 minute") - Consistently(func() bool { - podsNow := getComponentPods(ctx, namespace) - for name, pod := range podsNow { - if strings.HasPrefix(name, "sch-") { - if pod.Spec.Containers[0].Image == *ytsaurus.Spec.Schedulers.Image { - return false // Pod was updated, which shouldn't happen + By("Verify StatefulSet has OnDelete update strategy") + stsObject := appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: stsName}} + EventuallyObject(ctx, &stsObject, reactionTimeout).Should(HaveField("Spec.UpdateStrategy.Type", appsv1.OnDeleteStatefulSetStrategyType)) + + By("Verify pods are NOT updated after 10 sec") + expectedImage := getComponentImage(ytsaurus, componentType) + Consistently(func() bool { + podsNow := getComponentPods(ctx, namespace) + for name, pod := range podsNow { + if strings.HasPrefix(name, stsName+"-") { + if pod.Spec.Containers[0].Image == expectedImage { + return false // Pod was updated, which shouldn't happen + } + } + } + return true // All pods still on old image + }, 10*time.Second).Should(BeTrue()) + + By("Manually delete component pods") + for name := range podsBefore { + if strings.HasPrefix(name, stsName+"-") { + var pod corev1.Pod + err := k8sClient.Get(ctx, types.NamespacedName{ + Namespace: namespace, + Name: name, + }, &pod) + if err == nil { + Expect(k8sClient.Delete(ctx, &pod)).Should(Succeed()) + } } } - } - return true // All pods still on old image - }, 1*time.Minute, 5*time.Second).Should(BeTrue()) - - By("Manually delete scheduler pods") - for name := range podsBefore { - if strings.HasPrefix(name, "sch-") { - var pod corev1.Pod - err := k8sClient.Get(ctx, types.NamespacedName{ - Namespace: namespace, - Name: name, - }, &pod) - if err == nil { - Expect(k8sClient.Delete(ctx, &pod)).Should(Succeed()) - } - } - } - By("Waiting cluster update completes") - EventuallyYtsaurus(ctx, ytsaurus, upgradeTimeout).Should(HaveClusterStateRunning()) - - podsAfter := getComponentPods(ctx, namespace) - pods := getChangedPods(podsBefore, podsAfter) + By("Waiting cluster update completes") + EventuallyYtsaurus(ctx, ytsaurus, upgradeTimeout).Should(HaveClusterStateRunning()) - Expect(pods.Updated).To(ConsistOf("sch-0", "sch-1", "sch-2")) + podsAfter := getComponentPods(ctx, namespace) + expectedImage = getComponentImage(ytsaurus, componentType) - for name, pod := range podsAfter { - if strings.HasPrefix(name, "sch-") { - Expect(pod.Spec.Containers[0].Image).To(Equal(*ytsaurus.Spec.Schedulers.Image)) - } - } - }) + for name, pod := range podsAfter { + if strings.HasPrefix(name, stsName+"-") { + Expect(pod.Spec.Containers[0].Image).To(Equal(expectedImage)) + } + } + }) + }, + Entry("update scheduler", Label(consts.GetStatefulSetPrefix(consts.SchedulerType)), consts.SchedulerType, consts.GetStatefulSetPrefix(consts.SchedulerType)), + Entry("update master", Label(consts.GetStatefulSetPrefix(consts.MasterType)), consts.MasterType, consts.GetStatefulSetPrefix(consts.MasterType)), + Entry("update rpc proxy", Label(consts.GetStatefulSetPrefix(consts.RpcProxyType)), consts.RpcProxyType, consts.GetStatefulSetPrefix(consts.RpcProxyType)), + Entry("update http proxy", Label(consts.GetStatefulSetPrefix(consts.HttpProxyType)), consts.HttpProxyType, consts.GetStatefulSetPrefix(consts.HttpProxyType)), + ) }) }) @@ -2139,7 +2191,7 @@ func checkChunkLocations(ytClient yt.Client) { } var values []yson.ValueWithAttrs - Expect(ytClient.ListNode(specCtx, ypath.Path("//sys/data_nodes"), &values, &yt.ListNodeOptions{ + Expect(ytClient.ListNode(specCtx, ypath.Path(consts.ComponentCypressPath(consts.DataNodeType)), &values, &yt.ListNodeOptions{ Attributes: []string{"use_imaginary_chunk_locations"}, })).Should(Succeed())