diff --git a/helm/spark-operator/templates/operator.yaml b/helm/spark-operator/templates/operator.yaml index aba288d3..3d23c2c8 100644 --- a/helm/spark-operator/templates/operator.yaml +++ b/helm/spark-operator/templates/operator.yaml @@ -31,7 +31,7 @@ metadata: {{- end }} rules: - apiGroups: [""] - resources: ["pods", "replicationcontrollers", "services", "configmaps"] + resources: ["pods", "replicationcontrollers", "statefulsets" , "services", "configmaps"] verbs: ["create", "delete", "deletecollection", "get", "list", "update", "watch", "patch"] --- apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/manifest/olm/crd/sparkclusteroperator.1.0.1.clusterserviceversion.yaml b/manifest/olm/crd/sparkclusteroperator.1.0.1.clusterserviceversion.yaml index ab78f27e..e0f7c454 100644 --- a/manifest/olm/crd/sparkclusteroperator.1.0.1.clusterserviceversion.yaml +++ b/manifest/olm/crd/sparkclusteroperator.1.0.1.clusterserviceversion.yaml @@ -91,7 +91,7 @@ spec: verbs: - "*" - apiGroups: [""] - resources: ["pods", "replicationcontrollers", "services", "configmaps"] + resources: ["pods", "replicationcontrollers", "statefulsets" , "services", "configmaps"] verbs: ["create", "delete", "deletecollection", "get", "list", "update", "watch", "patch"] - apiGroups: - apiextensions.k8s.io diff --git a/manifest/operator-cm.yaml b/manifest/operator-cm.yaml index 6130b14a..6e5d372c 100644 --- a/manifest/operator-cm.yaml +++ b/manifest/operator-cm.yaml @@ -9,7 +9,7 @@ metadata: name: edit-resources rules: - apiGroups: [""] - resources: ["pods", "replicationcontrollers", "services", "configmaps"] + resources: ["pods", "replicationcontrollers", "statefulsets", "services", "configmaps"] verbs: ["create", "delete", "deletecollection", "get", "list", "update", "watch", "patch"] --- apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/manifest/operator.yaml b/manifest/operator.yaml index b02d280e..c4bfc8c2 100644 --- a/manifest/operator.yaml +++ b/manifest/operator.yaml @@ -37,7 +37,7 @@ spec: serviceAccountName: spark-operator containers: - name: spark-operator - image: quay.io/radanalyticsio/spark-operator:latest-released + image: jkremser/spark-operator:s-sets env: - name: WATCH_NAMESPACE # if not specified all the namespaces will be watched; ~ denotes the same ns as the operator's value: "~" @@ -60,5 +60,5 @@ spec: limits: memory: "512Mi" cpu: "1000m" - imagePullPolicy: Never + imagePullPolicy: Always diff --git a/src/main/java/io/radanalytics/operator/app/AppOperator.java b/src/main/java/io/radanalytics/operator/app/AppOperator.java index a49012ca..c7ee8f46 100644 --- a/src/main/java/io/radanalytics/operator/app/AppOperator.java +++ b/src/main/java/io/radanalytics/operator/app/AppOperator.java @@ -38,6 +38,7 @@ protected void onDelete(SparkApplication app) { String name = app.getName(); client.services().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete(); client.replicationControllers().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete(); + client.apps().statefulSets().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete(); client.pods().inNamespace(namespace).withLabels(deployer.getLabelsForDeletion(name)).delete(); } } diff --git a/src/main/java/io/radanalytics/operator/cluster/InitContainersHelper.java b/src/main/java/io/radanalytics/operator/cluster/InitContainersHelper.java index cf719eca..c52e6421 100644 --- a/src/main/java/io/radanalytics/operator/cluster/InitContainersHelper.java +++ b/src/main/java/io/radanalytics/operator/cluster/InitContainersHelper.java @@ -1,6 +1,7 @@ package io.radanalytics.operator.cluster; import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.api.model.apps.StatefulSet; import io.radanalytics.operator.historyServer.HistoryServerHelper; import io.radanalytics.types.*; @@ -34,16 +35,16 @@ public class InitContainersHelper { * * * - * @param rc ReplicationController instance + * @param ss StatefulSet instance * @param cluster SparkCluster instance * @param cmExists whether config map with overrides exists - * @return modified ReplicationController instance + * @return modified StatefulSet instance */ - public static final ReplicationController addInitContainers(ReplicationController rc, - SparkCluster cluster, - boolean cmExists, - boolean isMaster) { - PodSpec podSpec = rc.getSpec().getTemplate().getSpec(); + public static final StatefulSet addInitContainers(StatefulSet ss, + SparkCluster cluster, + boolean cmExists, + boolean isMaster) { + PodSpec podSpec = ss.getSpec().getTemplate().getSpec(); if (isMaster && HistoryServerHelper.needsVolume(cluster)) { createChmodHistoryServerContainer(cluster, podSpec); @@ -57,8 +58,8 @@ public static final ReplicationController addInitContainers(ReplicationControlle createConfigOverrideContainer(cluster, podSpec, cmExists); } - rc.getSpec().getTemplate().setSpec(podSpec); - return rc; + ss.getSpec().getTemplate().setSpec(podSpec); + return ss; } private static Container createDownloader(SparkCluster cluster, PodSpec podSpec) { diff --git a/src/main/java/io/radanalytics/operator/cluster/KubernetesSparkClusterDeployer.java b/src/main/java/io/radanalytics/operator/cluster/KubernetesSparkClusterDeployer.java index 56cc6417..1560f87d 100644 --- a/src/main/java/io/radanalytics/operator/cluster/KubernetesSparkClusterDeployer.java +++ b/src/main/java/io/radanalytics/operator/cluster/KubernetesSparkClusterDeployer.java @@ -1,10 +1,15 @@ package io.radanalytics.operator.cluster; import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.api.model.apps.StatefulSet; +import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder; +import io.fabric8.kubernetes.api.model.apps.StatefulSetFluent; +import io.fabric8.kubernetes.api.model.apps.StatefulSetSpecFluent; import io.fabric8.kubernetes.client.KubernetesClient; import io.radanalytics.operator.historyServer.HistoryServerHelper; import io.radanalytics.operator.resource.LabelsHelper; import io.radanalytics.types.*; +import io.radanalytics.types.PersistentVolume; import java.util.*; @@ -34,10 +39,10 @@ public KubernetesResourceList getResourceList(SparkCluster cluster) { if (cluster.getMaster() != null && cluster.getMaster().getLabels() != null) allMasterLabels.putAll(cluster.getMaster().getLabels()); - ReplicationController masterRc = getRCforMaster(cluster); - ReplicationController workerRc = getRCforWorker(cluster); + StatefulSet masterSs = getSSforMaster(cluster); + StatefulSet workerSs = getSSforWorker(cluster); Service masterService = getService(false, name, 7077, allMasterLabels); - List list = new ArrayList<>(Arrays.asList(masterRc, workerRc, masterService)); + List list = new ArrayList<>(Arrays.asList(masterSs, workerSs, masterService)); if (cluster.getSparkWebUI()) { Service masterUiService = getService(true, name, 8080, allMasterLabels); list.add(masterUiService); @@ -45,20 +50,44 @@ public KubernetesResourceList getResourceList(SparkCluster cluster) { // pvc for history server (in case of sharedVolume strategy) if (HistoryServerHelper.needsVolume(cluster)) { - PersistentVolumeClaim pvc = getPersistentVolumeClaim(cluster, getDefaultLabels(name)); + SharedVolume sharedVolume = Optional.ofNullable(cluster.getHistoryServer().getSharedVolume()).orElse(new SharedVolume()); + Map matchLabels = sharedVolume.getMatchLabels(); + if (null == matchLabels || matchLabels.isEmpty()) { + // if no match labels are specified, we assume the default one: radanalytics.io/SparkCluster: spark-cluster-name + matchLabels = new HashMap<>(1); + matchLabels.put(prefix + entityName, cluster.getName()); + } + PersistentVolumeClaim pvc = getPersistentVolumeClaim(cluster.getName() + "-hs-claim", getDefaultLabels(name), matchLabels, sharedVolume.getSize(), "ReadWriteMany"); list.add(pvc); } + + // pvcs for masters/workers + List pvcToBeAdded = new ArrayList<>(); + if (cluster.getMaster() != null) { + pvcToBeAdded.addAll(cluster.getMaster().getPersistentVolumes()); + } + if (cluster.getWorker() != null) { + pvcToBeAdded.addAll(cluster.getWorker().getPersistentVolumes()); + } + +// if (!pvcToBeAdded.isEmpty()) { +// for (PersistentVolume pv : pvcToBeAdded) { +// String pvName = cluster.getName() + "-" + Optional.ofNullable(pv.getName()).orElse(UUID.randomUUID().toString()) + "-claim"; +// PersistentVolumeClaim pvc = getPersistentVolumeClaim(pvName, getDefaultLabels(name), pv.getMatchLabels(), pv.getSize(), "ReadWriteOnce"); +// list.add(pvc); +// } +// } KubernetesList resources = new KubernetesListBuilder().withItems(list).build(); return resources; } } - private ReplicationController getRCforMaster(SparkCluster cluster) { - return getRCforMasterOrWorker(true, cluster); + private StatefulSet getSSforMaster(SparkCluster cluster) { + return getSSforMasterOrWorker(true, cluster); } - private ReplicationController getRCforWorker(SparkCluster cluster) { - return getRCforMasterOrWorker(false, cluster); + private StatefulSet getSSforWorker(SparkCluster cluster) { + return getSSforMasterOrWorker(false, cluster); } private Service getService(boolean isUi, String name, int port, Map allMasterLabels) { @@ -78,7 +107,7 @@ public static EnvVar env(String key, String value) { return new EnvVarBuilder().withName(key).withValue(value).build(); } - private ReplicationController getRCforMasterOrWorker(boolean isMaster, SparkCluster cluster) { + private StatefulSet getSSforMasterOrWorker(boolean isMaster, SparkCluster cluster) { String name = cluster.getName(); String podName = name + (isMaster ? "-m" : "-w"); Map selector = getSelector(name, podName); @@ -166,7 +195,7 @@ private ReplicationController getRCforMasterOrWorker(boolean isMaster, SparkClus podLabels.put(prefix + LabelsHelper.OPERATOR_POD_TYPE_LABEL, isMaster ? OPERATOR_TYPE_MASTER_LABEL : OPERATOR_TYPE_WORKER_LABEL); addLabels(podLabels, cluster, isMaster); - PodTemplateSpecFluent.SpecNested>> rcBuilder = new ReplicationControllerBuilder().withNewMetadata() + PodTemplateSpecFluent.SpecNested>> ssBuilder = new StatefulSetBuilder().withNewMetadata() .withName(podName).withLabels(labels) .endMetadata() .withNewSpec().withReplicas( @@ -176,11 +205,17 @@ private ReplicationController getRCforMasterOrWorker(boolean isMaster, SparkClus : Optional.ofNullable(cluster.getWorker()).orElse(new Worker()).getInstances() ) - .withSelector(selector) + .withNewSelector().withMatchLabels(selector).endSelector() .withNewTemplate().withNewMetadata().withLabels(podLabels).endMetadata() .withNewSpec().withContainers(containerBuilder.build()); - ReplicationController rc = rcBuilder.endSpec().endTemplate().endSpec().build(); +// // TODO: transform the replication controllers into stateful sets if the PV is asked +// List volumes = getVolumes(cluster, isMaster); +// if (volumes != null) { +// rcBuilder.withVolumes(volumes); +// } + + StatefulSet ss = ssBuilder.endSpec().endTemplate().endSpec().build(); // history server if (isMaster && null != cluster.getHistoryServer()) { @@ -189,23 +224,36 @@ private ReplicationController getRCforMasterOrWorker(boolean isMaster, SparkClus // add init containers that will prepare the data on the nodes or override the configuration if (!cluster.getDownloadData().isEmpty() || !cluster.getSparkConfiguration().isEmpty() || cmExists) { - InitContainersHelper.addInitContainers(rc, cluster, cmExists, isMaster); + InitContainersHelper.addInitContainers(ss, cluster, cmExists, isMaster); } - return rc; + return ss; + } - private PersistentVolumeClaim getPersistentVolumeClaim(SparkCluster cluster, Map labels) { - SharedVolume sharedVolume = Optional.ofNullable(cluster.getHistoryServer().getSharedVolume()).orElse(new SharedVolume()); - Map requests = new HashMap<>(); - requests.put("storage", new QuantityBuilder().withAmount(sharedVolume.getSize()).build()); - Map matchLabels = sharedVolume.getMatchLabels(); - if (null == matchLabels || matchLabels.isEmpty()) { - // if no match labels are specified, we assume the default one: radanalytics.io/SparkCluster: spark-cluster-name - matchLabels = new HashMap<>(1); - matchLabels.put(prefix + entityName, cluster.getName()); + private List getVolumes(SparkCluster cluster, boolean isMaster) { + if (isMaster) { + if (cluster.getMaster() == null) { + return null; + } +// cluster.getMaster().getPersistentVolumes() + } else { + if (cluster.getWorker() == null) { + return null; + } } - PersistentVolumeClaim pvc = new PersistentVolumeClaimBuilder().withNewMetadata().withName(cluster.getName() + "-claim").withLabels(labels).endMetadata() - .withNewSpec().withAccessModes("ReadWriteMany") + return null; + } + + private PersistentVolumeClaim getPersistentVolumeClaim(String name, + Map labels, + Map matchLabels, + String size, + String accessMode) { + Map requests = new HashMap<>(); + requests.put("storage", new QuantityBuilder().withAmount(size).build()); + + PersistentVolumeClaim pvc = new PersistentVolumeClaimBuilder().withNewMetadata().withName(name).withLabels(labels).endMetadata() + .withNewSpec().withAccessModes(accessMode) .withNewSelector().withMatchLabels(matchLabels).endSelector() .withNewResources().withRequests(requests).endResources().endSpec().build(); return pvc; diff --git a/src/main/java/io/radanalytics/operator/cluster/SparkClusterOperator.java b/src/main/java/io/radanalytics/operator/cluster/SparkClusterOperator.java index 45c50aeb..84c9147b 100644 --- a/src/main/java/io/radanalytics/operator/cluster/SparkClusterOperator.java +++ b/src/main/java/io/radanalytics/operator/cluster/SparkClusterOperator.java @@ -3,10 +3,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Functions; import com.google.common.collect.Sets; -import io.fabric8.kubernetes.api.model.DoneableReplicationController; import io.fabric8.kubernetes.api.model.KubernetesResourceList; -import io.fabric8.kubernetes.api.model.ReplicationController; -import io.fabric8.kubernetes.api.model.ReplicationControllerList; +import io.fabric8.kubernetes.api.model.apps.DoneableStatefulSet; +import io.fabric8.kubernetes.api.model.apps.StatefulSet; +import io.fabric8.kubernetes.api.model.apps.StatefulSetList; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable; @@ -65,7 +65,7 @@ protected void onAdd(SparkCluster cluster) { protected void onDelete(SparkCluster cluster) { String name = cluster.getName(); client.services().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete(); - client.replicationControllers().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete(); + client.apps().statefulSets().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete(); client.pods().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete(); client.persistentVolumeClaims().inNamespace(namespace).withLabels(getDeployer().getDefaultLabels(name)).delete(); getClusters().delete(name); @@ -93,7 +93,7 @@ protected void onModify(SparkCluster newCluster) { if (isOnlyScale(existingCluster, newCluster)) { log.info("{}scaling{} from {}{}{} worker replicas to {}{}{}", re(), xx(), ye(), existingCluster.getWorker().getInstances(), xx(), ye(), newWorkers, xx()); - client.replicationControllers().inNamespace(namespace).withName(name + "-w").scale(newWorkers); + client.apps().statefulSets().inNamespace(namespace).withName(name + "-w").scale(newWorkers); // update metrics MetricsHelper.workers.labels(newCluster.getName(), namespace).set(newCluster.getWorker().getInstances()); @@ -199,18 +199,19 @@ public void fullReconciliation() { } private Map getActual() { - MixedOperation> aux1 = - client.replicationControllers(); - FilterWatchListMultiDeletable> aux2 = + + MixedOperation> aux1 = + client.apps().statefulSets(); + FilterWatchListMultiDeletable> aux2 = "*".equals(namespace) ? aux1.inAnyNamespace() : aux1.inNamespace(namespace); Map labels =new HashMap<>(2); labels.put(prefix + OPERATOR_KIND_LABEL, entityName); labels.put(prefix + OPERATOR_RC_TYPE_LABEL, "worker"); - List workerRcs = aux2.withLabels(labels).list().getItems(); - Map retMap = workerRcs + List workerSss = aux2.withLabels(labels).list().getItems(); + Map retMap = workerSss .stream() - .collect(Collectors.toMap(rc -> rc.getMetadata().getLabels().get(prefix + entityName), - rc -> rc.getSpec().getReplicas())); + .collect(Collectors.toMap(ss -> ss.getMetadata().getLabels().get(prefix + entityName), + ss -> ss.getSpec().getReplicas())); return retMap; } diff --git a/src/main/resources/schema/sparkCluster.json b/src/main/resources/schema/sparkCluster.json index a96d5d55..97208b8a 100644 --- a/src/main/resources/schema/sparkCluster.json +++ b/src/main/resources/schema/sparkCluster.json @@ -38,6 +38,25 @@ "items": { "type": "string" } + }, + "persistentVolumes": { + "type": "array", + "items": { + "type": "object", + "properties": { + "mountPath": { + "type": "string" + }, + "size": { + "type": "string", + "default": "0.3Gi" + }, + "matchLabels": { + "type": "object", + "existingJavaType": "java.util.Map" + } + } + } } } }, @@ -71,6 +90,26 @@ "items": { "type": "string" } + }, + "persistentVolumes": { + "type": "array", + "items": { + "existingJavaType": "io.radanalytics.types.PersistentVolume", + "type": "object", + "properties": { + "mountPath": { + "type": "string" + }, + "size": { + "type": "string", + "default": "0.3Gi" + }, + "matchLabels": { + "type": "object", + "existingJavaType": "java.util.Map" + } + } + } } } },