Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/subflow_run_e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ jobs:

strategy:
fail-fast: false
max-parallel: 5
max-parallel: 6
matrix:
e2e:
- name: "update basic"
filter: "update && basic"
- name: "update plan"
filter: "update && plan"
filter: "update && plan && !strategy"
- name: "update strategy"
filter: "update && plan && strategy"
- name: "update other"
filter: "update && !(basic || plan)"
- name: "tls"
Expand Down
6 changes: 3 additions & 3 deletions controllers/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func NewComponentManager(
return allComponents
}

d := components.NewDiscovery(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus)
var hps []components.Component
for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec))
}
yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0], getAllComponents)
d := components.NewDiscovery(cfgen, ytsaurus, yc)

var dnds []components.Component
if len(resource.Spec.DataNodes) > 0 {
Expand Down Expand Up @@ -120,7 +120,7 @@ func NewComponentManager(
}

if resource.Spec.ControllerAgents != nil {
ca := components.NewControllerAgent(cfgen, ytsaurus, m)
ca := components.NewControllerAgent(cfgen, ytsaurus, m, yc)
allComponents = append(allComponents, ca)
}

Expand All @@ -147,7 +147,7 @@ func NewComponentManager(
}

if resource.Spec.MasterCaches != nil {
mc := components.NewMasterCache(cfgen, ytsaurus)
mc := components.NewMasterCache(cfgen, ytsaurus, yc)
allComponents = append(allComponents, mc)
}

Expand Down
86 changes: 81 additions & 5 deletions pkg/components/controller_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package components

import (
"context"
"fmt"

"go.ytsaurus.tech/yt/go/ypath"
"go.ytsaurus.tech/yt/go/yt"
corev1 "k8s.io/api/core/v1"

ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1"
Expand All @@ -14,11 +17,12 @@ import (

type ControllerAgent struct {
localServerComponent
cfgen *ytconfig.Generator
master Component
cfgen *ytconfig.Generator
master Component
ytsaurusClient internalYtsaurusClient
}

func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *ControllerAgent {
func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component, yc internalYtsaurusClient) *ControllerAgent {
l := cfgen.GetComponentLabeller(consts.ControllerAgentType, "")
resource := ytsaurus.GetResource()

Expand All @@ -41,6 +45,7 @@ func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus,
localServerComponent: newLocalServerComponent(l, ytsaurus, srv),
cfgen: cfgen,
master: master,
ytsaurusClient: yc,
}
}

Expand All @@ -56,8 +61,23 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu
}

if ca.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil {
return *status, err
if IsUpdatingComponent(ca.ytsaurus, ca) {
switch getComponentUpdateStrategy(ca.ytsaurus, consts.ControllerAgentType, ca.GetShortName()) {
case ytv1.ComponentUpdateModeTypeOnDelete:
if status, err := handleOnDeleteUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil {
return *status, err
}
default:
if status, err := handleBulkUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil {
return *status, err
}
}

if ca.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation {
return ComponentStatusReady(), err
}
} else {
return ComponentStatusReadyAfter("Not updating component"), nil
}
}

Expand Down Expand Up @@ -91,3 +111,59 @@ func (ca *ControllerAgent) Sync(ctx context.Context) error {
_, err := ca.doSync(ctx, false)
return err
}

type ControllerAgentsWithMaintenance struct {
Address string `yson:",value"`
Maintenance bool `yson:"maintenance,attr"`
Alerts []string `yson:"alerts,attr"`
}

func (ca *ControllerAgent) UpdatePreCheck(ctx context.Context) ComponentStatus {
if ca.ytsaurusClient == nil {
return ComponentStatusBlocked("YtsaurusClient component is not available")
}

ytClient := ca.ytsaurusClient.GetYtClient()
if ytClient == nil {
return ComponentStatusBlocked("YT client is not available")
}

// Check that the number of instances in YT matches the expected instanceCount
expectedCount := int(ca.ytsaurus.GetResource().Spec.ControllerAgents.InstanceCount)
if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.ControllerAgentType, expectedCount); err != nil {
return ComponentStatusBlocked(err.Error())
}

controllerAgentsWithMaintenance := make([]ControllerAgentsWithMaintenance, 0)
cypressPath := consts.ComponentCypressPath(consts.ControllerAgentType)

err := ca.ytsaurusClient.GetYtClient().ListNode(ctx, ypath.Path(cypressPath), &controllerAgentsWithMaintenance, &yt.ListNodeOptions{
Attributes: []string{"maintenance", "alerts"}})
if err != nil {
return ComponentStatusBlocked(err.Error())
}

for _, controllerAgent := range controllerAgentsWithMaintenance {
var connected bool
err = ca.ytsaurusClient.GetYtClient().GetNode(
ctx,
ypath.Path(fmt.Sprintf("%v/%v/orchid/controller_agent/service/connected", cypressPath, controllerAgent.Address)),
&connected,
nil)
if err != nil {
return ComponentStatusBlocked(err.Error())
}

if !connected {
msg := fmt.Sprintf("Controller agent is not connected: %v", controllerAgent.Address)
return ComponentStatusBlocked(msg)
}

if controllerAgent.Maintenance {
msg := fmt.Sprintf("There is a controller agent in maintenance: %v", controllerAgent.Address)
return ComponentStatusBlocked(msg)
}
}

return ComponentStatusReady()
}
63 changes: 59 additions & 4 deletions pkg/components/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package components

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"

Expand All @@ -10,14 +11,16 @@ import (
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/resources"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/ytconfig"
"go.ytsaurus.tech/yt/go/ypath"
)

type Discovery struct {
localServerComponent
cfgen *ytconfig.Generator
cfgen *ytconfig.Generator
ytsaurusClient internalYtsaurusClient
}

func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Discovery {
func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, yc internalYtsaurusClient) *Discovery {
l := cfgen.GetComponentLabeller(consts.DiscoveryType, "")
resource := ytsaurus.GetResource()

Expand All @@ -41,6 +44,7 @@ func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Disco
return &Discovery{
localServerComponent: newLocalServerComponent(l, ytsaurus, srv),
cfgen: cfgen,
ytsaurusClient: yc,
}
}

Expand All @@ -56,8 +60,23 @@ func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, erro
}

if d.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
return *status, err
if IsUpdatingComponent(d.ytsaurus, d) {
switch getComponentUpdateStrategy(d.ytsaurus, consts.DiscoveryType, d.GetShortName()) {
case ytv1.ComponentUpdateModeTypeOnDelete:
if status, err := handleOnDeleteUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
return *status, err
}
default:
if status, err := handleBulkUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
return *status, err
}
}

if d.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation {
return ComponentStatusReady(), err
}
} else {
return ComponentStatusReadyAfter("Not updating component"), nil
}
}

Expand All @@ -83,3 +102,39 @@ func (d *Discovery) Sync(ctx context.Context) error {
_, err := d.doSync(ctx, false)
return err
}

func (d *Discovery) UpdatePreCheck(ctx context.Context) ComponentStatus {
if d.ytsaurusClient == nil {
return ComponentStatusBlocked("YtsaurusClient component is not available")
}

ytClient := d.ytsaurusClient.GetYtClient()
if ytClient == nil {
return ComponentStatusBlocked("YT client is not available")
}

// Check that the number of instances in YT matches the expected instanceCount
expectedCount := int(d.ytsaurus.GetResource().Spec.Discovery.InstanceCount)
if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.DiscoveryType, expectedCount); err != nil {
return ComponentStatusBlocked(err.Error())
}

var discoveryServers []string
cypressPath := consts.ComponentCypressPath(consts.DiscoveryType)

err := d.ytsaurusClient.GetYtClient().ListNode(ctx, ypath.Path(cypressPath), &discoveryServers, nil)
if err != nil {
return ComponentStatusBlocked(err.Error())
}

// Check that all discovery_servers are alive
for _, server := range discoveryServers {
orchidPath := ypath.Path(fmt.Sprintf("%s/%s/orchid", cypressPath, server))
var orchidData map[string]interface{}
if err := ytClient.GetNode(ctx, orchidPath, &orchidData, nil); err != nil {
return ComponentStatusBlocked(fmt.Sprintf("Failed to get discovery orchid data for %s: %v", server, err))
}
}

return ComponentStatusReady()
}
75 changes: 38 additions & 37 deletions pkg/components/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,55 +199,55 @@ func handleBulkUpdatingClusterState(
) (*ComponentStatus, error) {
var err error

if ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsRemoval {
// Not in the pod removal phase, let the component handle other update states
return nil, err
}
switch ytsaurus.GetUpdateState() {
case ytv1.UpdateStateWaitingForPodsRemoval:
// Check if this component is using the new update mode
if !doesComponentUseNewUpdateMode(ytsaurus, cmp.GetType(), cmp.GetFullName()) {
if !dry {
err = removePods(ctx, server, cmpBase)
}
return ptr.To(ComponentStatusUpdateStep("pods removal")), err
}
ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: fmt.Sprintf("%s%s", cmp.GetFullName(), consts.ConditionBulkUpdateModeStarted),
Status: metav1.ConditionTrue,
Reason: "BulkUpdateModeStarted",
Message: "bulk update mode started",
})

// Run pre-checks if needed
if ytsaurus.ShouldRunPreChecks(cmp.GetType(), cmp.GetFullName()) {
if status, err := runPrechecks(ctx, ytsaurus, cmp); status != nil {
return status, err
}
}

// Check if this component is using the new update mode
if !doesComponentUseNewUpdateMode(ytsaurus, cmp.GetType(), cmp.GetFullName()) {
// Remove pods
if !dry {
ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: cmp.GetLabeller().GetScalingDownCondition(),
Status: metav1.ConditionTrue,
Reason: "RemovingPods",
Message: "removing pods",
})
err = removePods(ctx, server, cmpBase)
}
return ptr.To(ComponentStatusUpdateStep("pods removal")), err
}

ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: fmt.Sprintf("%s%s", cmp.GetFullName(), consts.ConditionBulkUpdateModeStarted),
Status: metav1.ConditionTrue,
Reason: "BulkUpdateModeStarted",
Message: "bulk update mode started",
})

// Run pre-checks if needed
if ytsaurus.ShouldRunPreChecks(cmp.GetType(), cmp.GetFullName()) {
if status, err := runPrechecks(ctx, ytsaurus, cmp); status != nil {
return status, err
}
}

// Remove pods
if !dry {
ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: cmp.GetLabeller().GetScalingDownCondition(),
Status: metav1.ConditionTrue,
Reason: "RemovingPods",
Message: "removing pods",
})
err = removePods(ctx, server, cmpBase)
}

// Update condition if state has progressed to pod creation
if ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForPodsCreation {
case ytv1.UpdateStateWaitingForPodsCreation:
ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: cmp.GetLabeller().GetScalingUpCondition(),
Status: metav1.ConditionTrue,
Reason: "CreatingPods",
Message: "creating new pods",
})
}
// Let the component handle its own post-removal / pod-creation logic.
return nil, err

return ptr.To(ComponentStatusUpdateStep("pods removal")), err
default:
// Not in the pod removal phase, let the component handle other update states
return nil, err
}
}

// handleOnDeleteUpdatingClusterState handles the OnDelete mode where pods must be manually deleted by the user.
Expand Down Expand Up @@ -421,7 +421,8 @@ func doesComponentUseNewUpdateMode(ytsaurus *apiproxy.Ytsaurus, componentType co
func getComponentUpdateStrategy(ytsaurus *apiproxy.Ytsaurus, componentType consts.ComponentType, componentName string) ytv1.ComponentUpdateModeType {
for _, selector := range ytsaurus.GetResource().Spec.UpdatePlan {
if selector.Component.Type == componentType &&
(selector.Component.Name == "" || selector.Component.Name == componentName) {
(selector.Component.Name == "" || selector.Component.Name == componentName) &&
selector.Strategy != nil {
return selector.Strategy.Type()
}
}
Expand Down
Loading
Loading