diff --git a/include/util.sh b/include/util.sh index e226ba0..4b0e2bd 100644 --- a/include/util.sh +++ b/include/util.sh @@ -31,7 +31,7 @@ clone_build_spark() { fi cd $spark_repo_local_dir git checkout -B $branch origin/$branch - ./dev/make-distribution.sh --tgz -Phadoop-2.7 -Pkubernetes -DskipTests; + ./dev/make-distribution.sh --tgz -Phadoop-2.7 -Pkubernetes -Phive -Phive-thriftserver -DskipTests; SPARK_TGZ=$(find $spark_repo_local_dir -name spark-*.tgz) popd diff --git a/pom.xml b/pom.xml index e3e48fc..95166b4 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,9 @@ 1.4.0 18.0 + 2.6.5 + org.spark-project.hive + 1.2.1.spark2 1.3.9 3.0.0 1.2.17 @@ -84,11 +87,21 @@ log4j ${log4j.version} + + org.apache.hadoop + hadoop-common + ${hadoop.version} + org.apache.commons commons-lang3 ${commons-lang3.version} + + ${hive.group} + hive-jdbc + ${hive.version} + org.scala-lang scala-library diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 89c8a91..d352827 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -20,10 +20,12 @@ import java.io.File import java.nio.file.{Path, Paths} import java.util.UUID import java.util.regex.Pattern +import java.sql.DriverManager import scala.collection.JavaConverters._ import com.google.common.io.PatternFilenameFilter import io.fabric8.kubernetes.api.model.{Container, Pod} +import org.apache.hive.jdbc.HiveDriver import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} @@ -121,6 +123,10 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit runSparkPiAndVerifyCompletion(appArgs = Array("5")) } + test("Run Spark Thrift Server") { + runThriftServerAndVerifyQuery() + } + test("Run SparkPi with custom labels, annotations, and environment variables.") { sparkAppConf .set("spark.kubernetes.driver.label.label1", "label1-value") @@ -239,6 +245,46 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit appLocator) } + private def runThriftServerAndVerifyQuery( + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + appArgs: Array[String] = Array.empty[String], + appLocator: String = appLocator): Unit = { + val appArguments = SparkAppArguments( + mainAppResource = "", + mainClass = "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", + appArgs = appArgs) + SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) + val driverPod = kubernetesTestComponents.kubernetesClient + .pods + .withLabel("spark-app-locator", appLocator) + .withLabel("spark-role", "driver") + .list() + .getItems + .get(0) + driverPodChecker(driverPod) + val driverPodResource = kubernetesTestComponents.kubernetesClient + .pods + .withName(driverPod.getMetadata.getName) + + Eventually.eventually(TIMEOUT, INTERVAL) { + val localPort = driverPodResource.portForward(10000).getLocalPort + val jdbcUri = s"jdbc:hive2://localhost:$localPort/" + val connection = DriverManager.getConnection(jdbcUri, "user", "pass") + val statement = connection.createStatement() + try { + val resultSet = statement.executeQuery("select 42") + resultSet.next() + assert(resultSet.getInt(1) == 42) + } finally { + try { + statement.close() + } finally { + connection.close() + } + } + } + } + private def runSparkApplicationAndVerifyCompletion( appResource: String, mainClass: String,