Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/subflow_run_e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ jobs:

strategy:
fail-fast: false
max-parallel: 5
max-parallel: 6
matrix:
e2e:
- name: "update basic"
filter: "update && basic"
- name: "update plan"
filter: "update && plan"
filter: "update && plan && !strategy"
- name: "update strategy"
filter: "update && plan && strategy"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. This escalates quickly.

Probably we could reuse one cluster and run several "updates" on it in random order.

I don't want "mock" clusters or invent fictional unit-test but bootstrapping took most of the time. Probably we could snapshot state and recreate it much faster.

- name: "update other"
filter: "update && !(basic || plan)"
- name: "tls"
Expand Down
6 changes: 3 additions & 3 deletions controllers/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func NewComponentManager(
return allComponents
}

d := components.NewDiscovery(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus)
var hps []components.Component
for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec))
}
yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0], getAllComponents)
d := components.NewDiscovery(cfgen, ytsaurus, yc)

var dnds []components.Component
if len(resource.Spec.DataNodes) > 0 {
Expand Down Expand Up @@ -120,7 +120,7 @@ func NewComponentManager(
}

if resource.Spec.ControllerAgents != nil {
ca := components.NewControllerAgent(cfgen, ytsaurus, m)
ca := components.NewControllerAgent(cfgen, ytsaurus, m, yc)
allComponents = append(allComponents, ca)
}

Expand All @@ -147,7 +147,7 @@ func NewComponentManager(
}

if resource.Spec.MasterCaches != nil {
mc := components.NewMasterCache(cfgen, ytsaurus)
mc := components.NewMasterCache(cfgen, ytsaurus, yc)
allComponents = append(allComponents, mc)
}

Expand Down
86 changes: 81 additions & 5 deletions pkg/components/controller_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package components

import (
"context"
"fmt"

"go.ytsaurus.tech/yt/go/ypath"
"go.ytsaurus.tech/yt/go/yt"
corev1 "k8s.io/api/core/v1"

ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1"
Expand All @@ -14,11 +17,12 @@ import (

type ControllerAgent struct {
localServerComponent
cfgen *ytconfig.Generator
master Component
cfgen *ytconfig.Generator
master Component
ytsaurusClient internalYtsaurusClient
}

func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *ControllerAgent {
func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component, yc internalYtsaurusClient) *ControllerAgent {
l := cfgen.GetComponentLabeller(consts.ControllerAgentType, "")
resource := ytsaurus.GetResource()

Expand All @@ -41,6 +45,7 @@ func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus,
localServerComponent: newLocalServerComponent(l, ytsaurus, srv),
cfgen: cfgen,
master: master,
ytsaurusClient: yc,
}
}

Expand All @@ -56,8 +61,23 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu
}

if ca.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil {
return *status, err
if IsUpdatingComponent(ca.ytsaurus, ca) {
switch getComponentUpdateStrategy(ca.ytsaurus, consts.ControllerAgentType, ca.GetShortName()) {
case ytv1.ComponentUpdateModeTypeOnDelete:
if status, err := handleOnDeleteUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil {
return *status, err
}
default:
if status, err := handleBulkUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil {
return *status, err
}
}

if ca.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation {
return ComponentStatusReady(), err
}
} else {
return ComponentStatusReadyAfter("Not updating component"), nil
}
}

Expand Down Expand Up @@ -91,3 +111,59 @@ func (ca *ControllerAgent) Sync(ctx context.Context) error {
_, err := ca.doSync(ctx, false)
return err
}

type ControllerAgentsWithMaintenance struct {
Address string `yson:",value"`
Maintenance bool `yson:"maintenance,attr"`
Alerts []string `yson:"alerts,attr"`
}

func (ca *ControllerAgent) UpdatePreCheck(ctx context.Context) ComponentStatus {
if ca.ytsaurusClient == nil {
return ComponentStatusBlocked("YtsaurusClient component is not available")
}

ytClient := ca.ytsaurusClient.GetYtClient()
if ytClient == nil {
return ComponentStatusBlocked("YT client is not available")
}

// Check that the number of instances in YT matches the expected instanceCount
expectedCount := int(ca.ytsaurus.GetResource().Spec.ControllerAgents.InstanceCount)
if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.ControllerAgentType, expectedCount); err != nil {
return ComponentStatusBlocked(err.Error())
}

controllerAgentsWithMaintenance := make([]ControllerAgentsWithMaintenance, 0)
cypressPath := consts.ComponentCypressPath(consts.ControllerAgentType)

err := ca.ytsaurusClient.GetYtClient().ListNode(ctx, ypath.Path(cypressPath), &controllerAgentsWithMaintenance, &yt.ListNodeOptions{
Attributes: []string{"maintenance", "alerts"}})
if err != nil {
return ComponentStatusBlocked(err.Error())
}

for _, controllerAgent := range controllerAgentsWithMaintenance {
var connected bool
err = ca.ytsaurusClient.GetYtClient().GetNode(
ctx,
ypath.Path(fmt.Sprintf("%v/%v/orchid/controller_agent/service/connected", cypressPath, controllerAgent.Address)),
&connected,
nil)
if err != nil {
return ComponentStatusBlocked(err.Error())
}

if !connected {
msg := fmt.Sprintf("Controller agent is not connected: %v", controllerAgent.Address)
return ComponentStatusBlocked(msg)
}

if controllerAgent.Maintenance {
msg := fmt.Sprintf("There is a controller agent in maintenance: %v", controllerAgent.Address)
return ComponentStatusBlocked(msg)
}
}

return ComponentStatusReady()
}
63 changes: 59 additions & 4 deletions pkg/components/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package components

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"

Expand All @@ -10,14 +11,16 @@ import (
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/resources"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/ytconfig"
"go.ytsaurus.tech/yt/go/ypath"
)

type Discovery struct {
localServerComponent
cfgen *ytconfig.Generator
cfgen *ytconfig.Generator
ytsaurusClient internalYtsaurusClient
}

func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Discovery {
func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, yc internalYtsaurusClient) *Discovery {
l := cfgen.GetComponentLabeller(consts.DiscoveryType, "")
resource := ytsaurus.GetResource()

Expand All @@ -41,6 +44,7 @@ func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Disco
return &Discovery{
localServerComponent: newLocalServerComponent(l, ytsaurus, srv),
cfgen: cfgen,
ytsaurusClient: yc,
}
}

Expand All @@ -56,8 +60,23 @@ func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, erro
}

if d.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
return *status, err
if IsUpdatingComponent(d.ytsaurus, d) {
switch getComponentUpdateStrategy(d.ytsaurus, consts.DiscoveryType, d.GetShortName()) {
case ytv1.ComponentUpdateModeTypeOnDelete:
if status, err := handleOnDeleteUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
return *status, err
}
default:
if status, err := handleBulkUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
return *status, err
}
}

if d.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation {
return ComponentStatusReady(), err
}
} else {
return ComponentStatusReadyAfter("Not updating component"), nil
}
}

Expand All @@ -83,3 +102,39 @@ func (d *Discovery) Sync(ctx context.Context) error {
_, err := d.doSync(ctx, false)
return err
}

func (d *Discovery) UpdatePreCheck(ctx context.Context) ComponentStatus {
if d.ytsaurusClient == nil {
return ComponentStatusBlocked("YtsaurusClient component is not available")
}

ytClient := d.ytsaurusClient.GetYtClient()
if ytClient == nil {
return ComponentStatusBlocked("YT client is not available")
}

// Check that the number of instances in YT matches the expected instanceCount
expectedCount := int(d.ytsaurus.GetResource().Spec.Discovery.InstanceCount)
if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.DiscoveryType, expectedCount); err != nil {
return ComponentStatusBlocked(err.Error())
}

var discoveryServers []string
cypressPath := consts.ComponentCypressPath(consts.DiscoveryType)

err := d.ytsaurusClient.GetYtClient().ListNode(ctx, ypath.Path(cypressPath), &discoveryServers, nil)
if err != nil {
return ComponentStatusBlocked(err.Error())
}

// Check that all discovery_servers are alive
for _, server := range discoveryServers {
orchidPath := ypath.Path(fmt.Sprintf("%s/%s/orchid", cypressPath, server))
var orchidData map[string]interface{}
if err := ytClient.GetNode(ctx, orchidPath, &orchidData, nil); err != nil {
return ComponentStatusBlocked(fmt.Sprintf("Failed to get discovery orchid data for %s: %v", server, err))
}
}

return ComponentStatusReady()
}
44 changes: 40 additions & 4 deletions pkg/components/master_caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (

type MasterCache struct {
localServerComponent
cfgen *ytconfig.Generator
cfgen *ytconfig.Generator
ytsaurusClient internalYtsaurusClient
}

func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *MasterCache {
func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, yc internalYtsaurusClient) *MasterCache {
l := cfgen.GetComponentLabeller(consts.MasterCacheType, "")

resource := ytsaurus.GetResource()
Expand All @@ -39,6 +40,7 @@ func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Mas
return &MasterCache{
localServerComponent: newLocalServerComponent(l, ytsaurus, srv),
cfgen: cfgen,
ytsaurusClient: yc,
}
}

Expand All @@ -54,8 +56,23 @@ func (mc *MasterCache) doSync(ctx context.Context, dry bool) (ComponentStatus, e
}

if mc.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, mc.ytsaurus, mc, &mc.localComponent, mc.server, dry); status != nil {
return *status, err
if IsUpdatingComponent(mc.ytsaurus, mc) {
switch getComponentUpdateStrategy(mc.ytsaurus, consts.MasterCacheType, mc.GetShortName()) {
case ytv1.ComponentUpdateModeTypeOnDelete:
if status, err := handleOnDeleteUpdatingClusterState(ctx, mc.ytsaurus, mc, &mc.localComponent, mc.server, dry); status != nil {
return *status, err
}
default:
if status, err := handleBulkUpdatingClusterState(ctx, mc.ytsaurus, mc, &mc.localComponent, mc.server, dry); status != nil {
return *status, err
}
}

if mc.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation {
return ComponentStatusReady(), err
}
} else {
return ComponentStatusReadyAfter("Not updating component"), nil
}
}

Expand Down Expand Up @@ -99,3 +116,22 @@ func (mc *MasterCache) getHostAddressLabel() string {
}
return defaultHostAddressLabel
}

func (mc *MasterCache) UpdatePreCheck(ctx context.Context) ComponentStatus {
if mc.ytsaurusClient == nil {
return ComponentStatusBlocked("YtsaurusClient component is not available")
}

ytClient := mc.ytsaurusClient.GetYtClient()
if ytClient == nil {
return ComponentStatusBlocked("YT client is not available")
}

// Check that the number of instances in YT matches the expected instanceCount
expectedCount := int(mc.ytsaurus.GetResource().Spec.MasterCaches.InstanceCount)
if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.MasterCacheType, expectedCount); err != nil {
return ComponentStatusBlocked(err.Error())
}

return ComponentStatusReady()
}
Loading
Loading