Skip to content

Commit

Permalink
Merge pull request rook#2986 from rohantmp/mainMon
Browse files Browse the repository at this point in the history
Ceph: Set noout on cluster when storage nodes are in maintenance
  • Loading branch information
travisn authored Apr 17, 2019
2 parents 6996571 + 894f4bf commit b7a564e
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 24 deletions.
1 change: 1 addition & 0 deletions PendingReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- Orchestration is automatically triggered when addition or removal of storage
devices is detected. This should remove the requirement of restarting the
operator to detect new devices.
- Rook will now set `noout` on the CephClusters that have osd Nodes tainted `NoSchedule`

## Breaking Changes

Expand Down
33 changes: 33 additions & 0 deletions pkg/daemon/ceph/client/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/rook/rook/pkg/clusterd"
)
Expand Down Expand Up @@ -63,6 +64,18 @@ type OSDDump struct {
Up json.Number `json:"up"`
In json.Number `json:"in"`
} `json:"osds"`
Flags string `json:"flags"`
}

// IsFlagSet checks if an OSD flag is set
func (dump *OSDDump) IsFlagSet(checkFlag string) bool {
flags := strings.Split(dump.Flags, ",")
for _, flag := range flags {
if flag == checkFlag {
return true
}
}
return false
}

// StatusByID returns status and inCluster states for given OSD id
Expand Down Expand Up @@ -180,6 +193,26 @@ func EnableScrubbing(context *clusterd.Context, clusterName string) (string, err
return string(buf), nil
}

// SetNoOut sets the osd flag `noout`
func SetNoOut(context *clusterd.Context, clusterName string) error {
args := []string{"osd", "set", "noout"}
_, err := ExecuteCephCommand(context, clusterName, args)
if err != nil {
return fmt.Errorf("failed to set noout: %+v", err)
}
return nil
}

// UnsetNoOut unsets the osd flag `noout`
func UnsetNoOut(context *clusterd.Context, clusterName string) error {
args := []string{"osd", "unset", "noout"}
_, err := ExecuteCephCommand(context, clusterName, args)
if err != nil {
return fmt.Errorf("failed to unset noout: %+v", err)
}
return nil
}

func (usage *OSDUsage) ByID(osdID int) *OSDNodeUsage {
for i := range usage.OSDNodes {
if usage.OSDNodes[i].ID == osdID {
Expand Down
88 changes: 83 additions & 5 deletions pkg/operator/ceph/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
cephbeta "github.com/rook/rook/pkg/apis/ceph.rook.io/v1beta1"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/daemon/ceph/agent/flexvolume/attachment"
"github.com/rook/rook/pkg/daemon/ceph/client"
discoverDaemon "github.com/rook/rook/pkg/daemon/discover"
"github.com/rook/rook/pkg/operator/ceph/cluster/mon"
"github.com/rook/rook/pkg/operator/ceph/cluster/osd"
Expand Down Expand Up @@ -366,16 +367,22 @@ func (c *ClusterController) onK8sNodeUpdate(oldObj, newObj interface{}) {
return
}

if k8sutil.GetNodeSchedulable(*newNode) == false {
logger.Debugf("Skipping cluster update. Updated node %s is unschedulable", newNode.Labels[apis.LabelHostname])
// set or unset noout depending on whether nodes are schedulable.
c.reconcileNodeMaintenance(newNode)

newNodeSchedulable := k8sutil.GetNodeSchedulable(*newNode)
oldNodeSchedulable := k8sutil.GetNodeSchedulable(*oldNode)

// Checking for NoSchedule added to storage node
if oldNodeSchedulable == false && newNodeSchedulable == false {
logger.Debugf("Skipping cluster update. Updated node %s was and is still unschedulable", newNode.Labels[apis.LabelHostname])
return
}

// Checking for nodes where NoSchedule-Taint got removed
if k8sutil.GetNodeSchedulable(*oldNode) == true {
if oldNodeSchedulable == true && newNodeSchedulable == true {
logger.Debugf("Skipping cluster update. Updated node %s was and it is still schedulable", oldNode.Labels[apis.LabelHostname])
return
}
// Checking for nodes where NoSchedule-Taint got removed

for _, cluster := range c.clusterMap {
if cluster.Info == nil {
Expand All @@ -397,6 +404,77 @@ func (c *ClusterController) onK8sNodeUpdate(oldObj, newObj interface{}) {
}
}

// set or unset noout depending on whether nodes are schedulable.
func (c *ClusterController) reconcileNodeMaintenance(updatedNode *v1.Node) {
clusters := c.getTenantClusters(updatedNode)

for _, cluster := range clusters {
if cluster.Info.IsInitialized() {
nodes, err := osd.GetAllStorageNodes(cluster.context, cluster.Namespace)
if err != nil {
logger.Errorf("Error getting all storage nodes for cluster: %v", err)
}
allSchedulable := true
for _, node := range nodes {
if !k8sutil.GetNodeSchedulable(node) {
allSchedulable = false
break
}
}
osdDump, err := client.GetOSDDump(c.context, cluster.Info.Name)
if err != nil {
logger.Errorf("failed to get the noout value: %+v", err)
}
nooutFlagSet := osdDump.IsFlagSet("noout")
if allSchedulable {
if nooutFlagSet {
logger.Infof("Unsetting noout because no storage nodes are in maintenance")
client.UnsetNoOut(c.context, cluster.Info.Name)
} else {
logger.Debugf("No storage nodes are in maintenance. Noout already unset.")
}
} else {
if !nooutFlagSet {
logger.Infof("Setting noout because a storage node is in maintenance")
client.SetNoOut(c.context, cluster.Info.Name)
} else {
logger.Debugf("A storage node is in maintenance. Noout already set.")
}
}
} else {
logger.Errorf("The cluster's info is uninitialized")
}
}

}

// makes a list of all clusters that have osds on the node
func (c *ClusterController) getTenantClusters(node *v1.Node) []*cluster {
var clusters []*cluster
// list osd deployments for all namespaces
for namespace, clusterObj := range c.clusterMap {
listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", osd.AppName)}
osdDeployments, err := c.context.Clientset.Apps().Deployments(namespace).List(listOpts)
if err != nil {
logger.Errorf("Failed to get deployments, %v", err)
}
for _, osdDeployment := range osdDeployments.Items {
osdPodSpec := osdDeployment.Spec.Template.Spec
// get the node name from the node selector
nodeName, ok := osdPodSpec.NodeSelector[apis.LabelHostname]
if !ok || nodeName == "" {
logger.Errorf("osd deployment %s doesn't have a node name on its node selector: %+v", osdDeployment.Name, osdPodSpec.NodeSelector)
} else if nodeName == node.ObjectMeta.Name {
clusters = append(clusters, clusterObj)
break
}

}
}
return clusters

}

func (c *ClusterController) onUpdate(oldObj, newObj interface{}) {
oldClust, _, err := getClusterObject(oldObj)
if err != nil {
Expand Down
30 changes: 24 additions & 6 deletions pkg/operator/ceph/cluster/osd/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ import (
var logger = capnslog.NewPackageLogger("github.com/rook/rook", "op-osd")

const (
appName = "rook-ceph-osd"
//AppName is the label-value to the app key for osds
AppName = "rook-ceph-osd"
prepareAppName = "rook-ceph-osd-prepare"
prepareAppNameFmt = "rook-ceph-osd-prepare-%s"
legacyAppNameFmt = "rook-ceph-osd-id-%d"
Expand Down Expand Up @@ -422,12 +423,13 @@ func (c *Cluster) cleanupRemovedNode(config *provisionConfig, nodeName, crushNam
}
}

// discover nodes which currently have osds scheduled on them. Return a mapping of
// node names -> a list of osd deployments on the node
func (c *Cluster) discoverStorageNodes() (map[string][]*apps.Deployment, error) {
// DiscoverStorageNodes discovers nodes which currently have osds scheduled on them
// by a cluster (only one cluster per namespace is possible).
// Returns a mapping of node names -> a list of osd deployments on the node
func DiscoverStorageNodes(context *clusterd.Context, namespace string) (map[string][]*apps.Deployment, error) {

listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", appName)}
osdDeployments, err := c.context.Clientset.AppsV1().Deployments(c.Namespace).List(listOpts)
listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", AppName)}
osdDeployments, err := context.Clientset.AppsV1().Deployments(namespace).List(listOpts)
if err != nil {
return nil, fmt.Errorf("failed to list osd deployment: %+v", err)
}
Expand All @@ -453,6 +455,22 @@ func (c *Cluster) discoverStorageNodes() (map[string][]*apps.Deployment, error)
return discoveredNodes, nil
}

// GetAllStorageNodes returns an array of v1.Node objs this cluster has osds on
func GetAllStorageNodes(context *clusterd.Context, namespace string) ([]v1.Node, error) {
// Generate the LabelSelector
discoveredNodes, err := DiscoverStorageNodes(context, namespace)
nodeNameList := make([]string, 0, 3)
for nodeName := range discoveredNodes {
nodeNameList = append(nodeNameList, nodeName)
}
labelSelector := fmt.Sprintf("kubernetes.io/hostname in (%v)", strings.Join(nodeNameList, ", "))

//get the node list
listOpts := metav1.ListOptions{LabelSelector: labelSelector}
nodeList, err := context.Clientset.Core().Nodes().List(listOpts)
return nodeList.Items, err
}

func (c *Cluster) isSafeToRemoveNode(nodeName string, osdDeployments []*apps.Deployment) error {
if err := client.IsClusterClean(c.context, c.Namespace); err != nil {
// the cluster isn't clean, it's not safe to remove this node
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/osd/osd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestAddRemoveNode(t *testing.T) {
// simulate the OSD pod having been created
osdPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "osdPod",
Labels: map[string]string{k8sutil.AppAttr: appName}}}
Labels: map[string]string{k8sutil.AppAttr: AppName}}}
c.context.Clientset.CoreV1().Pods(c.Namespace).Create(osdPod)

// mock the ceph calls that will be called during remove node
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestDiscoverOSDs(t *testing.T) {
clientset := fake.NewSimpleClientset(d1, d2, d3)
c.context.Clientset = clientset

discovered, err := c.discoverStorageNodes()
discovered, err := DiscoverStorageNodes(c.context, c.Namespace)
require.Nil(t, err)
assert.Equal(t, 2, len(discovered))

Expand Down
10 changes: 5 additions & 5 deletions pkg/operator/ceph/cluster/osd/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,15 @@ func (c *Cluster) makeDeployment(nodeName string, selection rookalpha.Selection,
Name: fmt.Sprintf(osdAppNameFmt, osd.ID),
Namespace: c.Namespace,
Labels: map[string]string{
k8sutil.AppAttr: appName,
k8sutil.AppAttr: AppName,
k8sutil.ClusterAttr: c.Namespace,
osdLabelKey: fmt.Sprintf("%d", osd.ID),
},
},
Spec: apps.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
k8sutil.AppAttr: appName,
k8sutil.AppAttr: AppName,
k8sutil.ClusterAttr: c.Namespace,
osdLabelKey: fmt.Sprintf("%d", osd.ID),
},
Expand All @@ -278,9 +278,9 @@ func (c *Cluster) makeDeployment(nodeName string, selection rookalpha.Selection,
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Name: AppName,
Labels: map[string]string{
k8sutil.AppAttr: appName,
k8sutil.AppAttr: AppName,
k8sutil.ClusterAttr: c.Namespace,
osdLabelKey: fmt.Sprintf("%d", osd.ID),
},
Expand Down Expand Up @@ -395,7 +395,7 @@ func (c *Cluster) provisionPodTemplateSpec(devices []rookalpha.Device, selection
c.placement.ApplyToPodSpec(&podSpec)

podMeta := metav1.ObjectMeta{
Name: appName,
Name: AppName,
Labels: map[string]string{
k8sutil.AppAttr: prepareAppName,
k8sutil.ClusterAttr: c.Namespace,
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/osd/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func testPodDevices(t *testing.T, dataDir, deviceName string, allDevices bool) {
assert.Equal(t, "rook-data", deployment.Spec.Template.Spec.Volumes[0].Name)
assert.Equal(t, "ceph-default-config-dir", deployment.Spec.Template.Spec.Volumes[1].Name)

assert.Equal(t, appName, deployment.Spec.Template.ObjectMeta.Name)
assert.Equal(t, appName, deployment.Spec.Template.ObjectMeta.Labels["app"])
assert.Equal(t, AppName, deployment.Spec.Template.ObjectMeta.Name)
assert.Equal(t, AppName, deployment.Spec.Template.ObjectMeta.Labels["app"])
assert.Equal(t, c.Namespace, deployment.Spec.Template.ObjectMeta.Labels["rook_cluster"])
assert.Equal(t, 0, len(deployment.Spec.Template.ObjectMeta.Annotations))

Expand Down
8 changes: 4 additions & 4 deletions pkg/operator/ceph/cluster/osd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/rook/rook/pkg/operator/k8sutil"
"github.com/rook/rook/pkg/util"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -66,7 +66,7 @@ func (c *Cluster) updateNodeStatus(node string, status OrchestrationStatus) erro

func UpdateNodeStatus(kv *k8sutil.ConfigMapKVStore, node string, status OrchestrationStatus) error {
labels := map[string]string{
k8sutil.AppAttr: appName,
k8sutil.AppAttr: AppName,
orchestrationStatusKey: provisioningLabelKey,
nodeLabelKey: node,
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func (c *Cluster) checkNodesCompleted(selector string, config *provisionConfig,

func (c *Cluster) completeOSDsForAllNodes(config *provisionConfig, configOSDs bool, timeoutMinutes int) bool {
selector := fmt.Sprintf("%s=%s,%s=%s",
k8sutil.AppAttr, appName,
k8sutil.AppAttr, AppName,
orchestrationStatusKey, provisioningLabelKey,
)

Expand Down Expand Up @@ -277,7 +277,7 @@ func (c *Cluster) findRemovedNodes() (map[string][]*apps.Deployment, error) {
removedNodes := map[string][]*apps.Deployment{}

// first discover the storage nodes that are still running
discoveredNodes, err := c.discoverStorageNodes()
discoveredNodes, err := DiscoverStorageNodes(c.context, c.Namespace)
if err != nil {
return nil, fmt.Errorf("aborting search for removed nodes. failed to discover storage nodes. %+v", err)
}
Expand Down

0 comments on commit b7a564e

Please sign in to comment.