Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions controllers/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func NewComponentManager(
return allComponents
}

d := components.NewDiscovery(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus)
var hps []components.Component
for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec))
}
yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0], getAllComponents)
d := components.NewDiscovery(cfgen, ytsaurus, yc)

var dnds []components.Component
if len(resource.Spec.DataNodes) > 0 {
Expand Down Expand Up @@ -120,7 +120,7 @@ func NewComponentManager(
}

if resource.Spec.ControllerAgents != nil {
ca := components.NewControllerAgent(cfgen, ytsaurus, m)
ca := components.NewControllerAgent(cfgen, ytsaurus, m, yc)
allComponents = append(allComponents, ca)
}

Expand All @@ -147,7 +147,7 @@ func NewComponentManager(
}

if resource.Spec.MasterCaches != nil {
mc := components.NewMasterCache(cfgen, ytsaurus)
mc := components.NewMasterCache(cfgen, ytsaurus, yc)
allComponents = append(allComponents, mc)
}

Expand Down
86 changes: 81 additions & 5 deletions pkg/components/controller_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package components

import (
"context"
"fmt"

"go.ytsaurus.tech/yt/go/ypath"
"go.ytsaurus.tech/yt/go/yt"
corev1 "k8s.io/api/core/v1"

ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1"
Expand All @@ -14,11 +17,12 @@ import (

type ControllerAgent struct {
localServerComponent
cfgen *ytconfig.Generator
master Component
cfgen *ytconfig.Generator
master Component
ytsaurusClient internalYtsaurusClient
}

func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *ControllerAgent {
func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component, yc internalYtsaurusClient) *ControllerAgent {
l := cfgen.GetComponentLabeller(consts.ControllerAgentType, "")
resource := ytsaurus.GetResource()

Expand All @@ -41,6 +45,7 @@ func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus,
localServerComponent: newLocalServerComponent(l, ytsaurus, srv),
cfgen: cfgen,
master: master,
ytsaurusClient: yc,
}
}

Expand All @@ -56,8 +61,23 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu
}

if ca.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil {
return *status, err
if IsUpdatingComponent(ca.ytsaurus, ca) {
switch getComponentUpdateStrategy(ca.ytsaurus, consts.ControllerAgentType, ca.GetShortName()) {
case ytv1.ComponentUpdateModeTypeOnDelete:
if status, err := handleOnDeleteUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil {
return *status, err
}
default:
if status, err := handleBulkUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil {
return *status, err
}
}

if ca.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation {
return ComponentStatusReady(), err
}
} else {
return ComponentStatusReadyAfter("Not updating component"), nil
}
}

Expand Down Expand Up @@ -91,3 +111,59 @@ func (ca *ControllerAgent) Sync(ctx context.Context) error {
_, err := ca.doSync(ctx, false)
return err
}

type ControllerAgentsWithMaintenance struct {
Address string `yson:",value"`
Maintenance bool `yson:"maintenance,attr"`
Alerts []string `yson:"alerts,attr"`
}

func (ca *ControllerAgent) UpdatePreCheck(ctx context.Context) ComponentStatus {
if ca.ytsaurusClient == nil {
return ComponentStatusBlocked("YtsaurusClient component is not available")
}

ytClient := ca.ytsaurusClient.GetYtClient()
if ytClient == nil {
return ComponentStatusBlocked("YT client is not available")
}

// Check that the number of instances in YT matches the expected instanceCount
expectedCount := int(ca.ytsaurus.GetResource().Spec.ControllerAgents.InstanceCount)
if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.ControllerAgentType, expectedCount); err != nil {
return ComponentStatusBlocked(err.Error())
}

controllerAgentsWithMaintenance := make([]ControllerAgentsWithMaintenance, 0)
cypressPath := consts.ComponentCypressPath(consts.ControllerAgentType)

err := ca.ytsaurusClient.GetYtClient().ListNode(ctx, ypath.Path(cypressPath), &controllerAgentsWithMaintenance, &yt.ListNodeOptions{
Attributes: []string{"maintenance", "alerts"}})
if err != nil {
return ComponentStatusBlocked(err.Error())
}

for _, controllerAgent := range controllerAgentsWithMaintenance {
var connected bool
err = ca.ytsaurusClient.GetYtClient().GetNode(
ctx,
ypath.Path(fmt.Sprintf("%v/%v/orchid/controller_agent/service/connected", cypressPath, controllerAgent.Address)),
&connected,
nil)
if err != nil {
return ComponentStatusBlocked(err.Error())
}

if !connected {
msg := fmt.Sprintf("Controller agent is not connected: %v", controllerAgent.Address)
return ComponentStatusBlocked(msg)
}

if controllerAgent.Maintenance {
msg := fmt.Sprintf("There is a controller agent in maintenance: %v", controllerAgent.Address)
return ComponentStatusBlocked(msg)
}
}

return ComponentStatusReady()
}
63 changes: 59 additions & 4 deletions pkg/components/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package components

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"

Expand All @@ -10,14 +11,16 @@ import (
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/resources"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/ytconfig"
"go.ytsaurus.tech/yt/go/ypath"
)

type Discovery struct {
localServerComponent
cfgen *ytconfig.Generator
cfgen *ytconfig.Generator
ytsaurusClient internalYtsaurusClient
}

func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Discovery {
func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, yc internalYtsaurusClient) *Discovery {
l := cfgen.GetComponentLabeller(consts.DiscoveryType, "")
resource := ytsaurus.GetResource()

Expand All @@ -41,6 +44,7 @@ func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Disco
return &Discovery{
localServerComponent: newLocalServerComponent(l, ytsaurus, srv),
cfgen: cfgen,
ytsaurusClient: yc,
}
}

Expand All @@ -56,8 +60,23 @@ func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, erro
}

if d.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
return *status, err
if IsUpdatingComponent(d.ytsaurus, d) {
switch getComponentUpdateStrategy(d.ytsaurus, consts.DiscoveryType, d.GetShortName()) {
case ytv1.ComponentUpdateModeTypeOnDelete:
if status, err := handleOnDeleteUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
return *status, err
}
default:
if status, err := handleBulkUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
return *status, err
}
}

if d.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation {
return ComponentStatusReady(), err
}
} else {
return ComponentStatusReadyAfter("Not updating component"), nil
}
}

Expand All @@ -83,3 +102,39 @@ func (d *Discovery) Sync(ctx context.Context) error {
_, err := d.doSync(ctx, false)
return err
}

func (d *Discovery) UpdatePreCheck(ctx context.Context) ComponentStatus {
if d.ytsaurusClient == nil {
return ComponentStatusBlocked("YtsaurusClient component is not available")
}

ytClient := d.ytsaurusClient.GetYtClient()
if ytClient == nil {
return ComponentStatusBlocked("YT client is not available")
}

// Check that the number of instances in YT matches the expected instanceCount
expectedCount := int(d.ytsaurus.GetResource().Spec.Discovery.InstanceCount)
if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.DiscoveryType, expectedCount); err != nil {
return ComponentStatusBlocked(err.Error())
}

var discoveryServers []string
cypressPath := consts.ComponentCypressPath(consts.DiscoveryType)

err := d.ytsaurusClient.GetYtClient().ListNode(ctx, ypath.Path(cypressPath), &discoveryServers, nil)
if err != nil {
return ComponentStatusBlocked(err.Error())
}

// Check that all discovery_servers are alive
for _, server := range discoveryServers {
orchidPath := ypath.Path(fmt.Sprintf("%s/%s/orchid", cypressPath, server))
var orchidData map[string]interface{}
if err := ytClient.GetNode(ctx, orchidPath, &orchidData, nil); err != nil {
return ComponentStatusBlocked(fmt.Sprintf("Failed to get discovery orchid data for %s: %v", server, err))
}
}

return ComponentStatusReady()
}
44 changes: 40 additions & 4 deletions pkg/components/master_caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (

type MasterCache struct {
localServerComponent
cfgen *ytconfig.Generator
cfgen *ytconfig.Generator
ytsaurusClient internalYtsaurusClient
}

func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *MasterCache {
func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, yc internalYtsaurusClient) *MasterCache {
l := cfgen.GetComponentLabeller(consts.MasterCacheType, "")

resource := ytsaurus.GetResource()
Expand All @@ -39,6 +40,7 @@ func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Mas
return &MasterCache{
localServerComponent: newLocalServerComponent(l, ytsaurus, srv),
cfgen: cfgen,
ytsaurusClient: yc,
}
}

Expand All @@ -54,8 +56,23 @@ func (mc *MasterCache) doSync(ctx context.Context, dry bool) (ComponentStatus, e
}

if mc.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, mc.ytsaurus, mc, &mc.localComponent, mc.server, dry); status != nil {
return *status, err
if IsUpdatingComponent(mc.ytsaurus, mc) {
switch getComponentUpdateStrategy(mc.ytsaurus, consts.MasterCacheType, mc.GetShortName()) {
case ytv1.ComponentUpdateModeTypeOnDelete:
if status, err := handleOnDeleteUpdatingClusterState(ctx, mc.ytsaurus, mc, &mc.localComponent, mc.server, dry); status != nil {
return *status, err
}
default:
if status, err := handleBulkUpdatingClusterState(ctx, mc.ytsaurus, mc, &mc.localComponent, mc.server, dry); status != nil {
return *status, err
}
}

if mc.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation {
return ComponentStatusReady(), err
}
} else {
return ComponentStatusReadyAfter("Not updating component"), nil
}
}

Expand Down Expand Up @@ -99,3 +116,22 @@ func (mc *MasterCache) getHostAddressLabel() string {
}
return defaultHostAddressLabel
}

func (mc *MasterCache) UpdatePreCheck(ctx context.Context) ComponentStatus {
if mc.ytsaurusClient == nil {
return ComponentStatusBlocked("YtsaurusClient component is not available")
}

ytClient := mc.ytsaurusClient.GetYtClient()
if ytClient == nil {
return ComponentStatusBlocked("YT client is not available")
}

// Check that the number of instances in YT matches the expected instanceCount
expectedCount := int(mc.ytsaurus.GetResource().Spec.MasterCaches.InstanceCount)
if err := IsInstanceCountEqualYTSpec(ctx, ytClient, consts.MasterCacheType, expectedCount); err != nil {
return ComponentStatusBlocked(err.Error())
}

return ComponentStatusReady()
}
10 changes: 10 additions & 0 deletions test/e2e/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ func getChangedPods(before, after map[string]corev1.Pod) changedObjects {
return ret
}

func getExpectedUpdatedPods(podsBeforeUpdate map[string]corev1.Pod, stsName string) []string {
expectedUpdated := make([]string, 0)
for name := range podsBeforeUpdate {
if strings.HasPrefix(name, stsName+"-") {
expectedUpdated = append(expectedUpdated, name)
}
}
return expectedUpdated
}

// updateSpecToTriggerAllComponentUpdate is a helper
// that introduce spec change which should trigger change in all component static configs
// and thus trigger all components update.
Expand Down
Loading
Loading