Skip to content

Commit

Permalink
[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YA…
Browse files Browse the repository at this point in the history
…RN/HDFS

Take 2. Does the same thing as apache#4688, but fixes Hadoop-1 build.

Author: Hari Shreedharan <[email protected]>

Closes apache#5823 from harishreedharan/kerberos-longrunning and squashes the following commits:

3c86bba [Hari Shreedharan] Import fixes. Import postfixOps explicitly.
4d04301 [Hari Shreedharan] Minor formatting fixes.
b5e7a72 [Hari Shreedharan] Remove reflection, use a method in SparkHadoopUtil to update the token renewer.
7bff6e9 [Hari Shreedharan] Make sure all required classes are present in the jar. Fix import order.
e851f70 [Hari Shreedharan] Move the ExecutorDelegationTokenRenewer to yarn module. Use reflection to use it.
36eb8a9 [Hari Shreedharan] Change the renewal interval config param. Fix a bunch of comments.
611923a [Hari Shreedharan] Make sure the namenodes are listed correctly for creating tokens.
09fe224 [Hari Shreedharan] Use token.renew to get token's renewal interval rather than using hdfs-site.xml
6963bbc [Hari Shreedharan] Schedule renewal in AM before starting user class. Else, a restarted AM cannot access HDFS if the user class tries to.
072659e [Hari Shreedharan] Fix build failure caused by thread factory getting moved to ThreadUtils.
f041dd3 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning
42eead4 [Hari Shreedharan] Remove RPC part. Refactor and move methods around, use renewal interval rather than max lifetime to create new tokens.
ebb36f5 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning
bc083e3 [Hari Shreedharan] Overload RegisteredExecutor to send tokens. Minor doc updates.
7b19643 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning
8a4f268 [Hari Shreedharan] Added docs in the security guide. Changed some code to ensure that the renewer objects are created only if required.
e800c8b [Hari Shreedharan] Restore original RegisteredExecutor message, and send new tokens via NewTokens message.
0e9507e [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning
7f1bc58 [Hari Shreedharan] Minor fixes, cleanup.
bcd11f9 [Hari Shreedharan] Refactor AM and Executor token update code into separate classes, also send tokens via akka on executor startup.
f74303c [Hari Shreedharan] Move the new logic into specialized classes. Add cleanup for old credentials files.
2f9975c [Hari Shreedharan] Ensure new tokens are written out immediately on AM restart. Also, pikc up the latest suffix from HDFS if the AM is restarted.
61b2b27 [Hari Shreedharan] Account for AM restarts by making sure lastSuffix is read from the files on HDFS.
62c45ce [Hari Shreedharan] Relogin from keytab periodically.
fa233bd [Hari Shreedharan] Adding logging, fixing minor formatting and ordering issues.
42813b4 [Hari Shreedharan] Remove utils.sh, which was re-added due to merge with master.
0de27ee [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning
55522e3 [Hari Shreedharan] Fix failure caused by Preconditions ambiguity.
9ef5f1b [Hari Shreedharan] Added explanation of how the credentials refresh works, some other minor fixes.
f4fd711 [Hari Shreedharan] Fix SparkConf usage.
2debcea [Hari Shreedharan] Change the file structure for credentials files. I will push a followup patch which adds a cleanup mechanism for old credentials files. The credentials files are small and few enough for it to cause issues on HDFS.
af6d5f0 [Hari Shreedharan] Cleaning up files where changes weren't required.
f0f54cb [Hari Shreedharan] Be more defensive when updating the credentials file.
f6954da [Hari Shreedharan] Got rid of Akka communication to renew, instead the executors check a known file's modification time to read the credentials.
5c11c3e [Hari Shreedharan] Move tests to YarnSparkHadoopUtil to fix compile issues.
b4cb917 [Hari Shreedharan] Send keytab to AM via DistributedCache rather than directly via HDFS
0985b4e [Hari Shreedharan] Write tokens to HDFS and read them back when required, rather than sending them over the wire.
d79b2b9 [Hari Shreedharan] Make sure correct credentials are passed to FileSystem#addDelegationTokens()
8c6928a [Hari Shreedharan] Fix issue caused by direct creation of Actor object.
fb27f46 [Hari Shreedharan] Make sure principal and keytab are set before CoarseGrainedSchedulerBackend is started. Also schedule re-logins in CoarseGrainedSchedulerBackend#start()
41efde0 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning
d282d7a [Hari Shreedharan] Fix ClientSuite to set YARN mode, so that the correct class is used in tests.
bcfc374 [Hari Shreedharan] Fix Hadoop-1 build by adding no-op methods in SparkHadoopUtil, with impl in YarnSparkHadoopUtil.
f8fe694 [Hari Shreedharan] Handle None if keytab-login is not scheduled.
2b0d745 [Hari Shreedharan] [SPARK-5342][YARN] Allow long running Spark apps to run on secure YARN/HDFS.
ccba5bc [Hari Shreedharan] WIP: More changes wrt kerberos
77914dd [Hari Shreedharan] WIP: Add kerberos principal and keytab to YARN client.
  • Loading branch information
harishreedharan authored and tgravescs committed May 1, 2015
1 parent 4dc8d74 commit b1f4ca8
Show file tree
Hide file tree
Showing 16 changed files with 657 additions and 109 deletions.
81 changes: 79 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.spark.deploy

import java.io.{ByteArrayInputStream, DataInputStream}
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
Expand All @@ -32,14 +36,17 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps

/**
* :: DeveloperApi ::
* Contains util methods to interact with Hadoop from Spark.
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration(new SparkConf())
private val sparkConf = new SparkConf()
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)

/**
Expand Down Expand Up @@ -201,6 +208,61 @@ class SparkHadoopUtil extends Logging {
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}

/**
* Lists all the files in a directory with the specified prefix, and does not end with the
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
* the respective files.
*/
def listFilesSorted(
remoteFs: FileSystem,
dir: Path,
prefix: String,
exclusionSuffix: String): Array[FileStatus] = {
val fileStatuses = remoteFs.listStatus(dir,
new PathFilter {
override def accept(path: Path): Boolean = {
val name = path.getName
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
}
})
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
fileStatuses
}

/**
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
* is valid the latest)?
* This will return -ve (or 0) value if the fraction of validity has already expired.
*/
def getTimeFromNowToRenewal(
sparkConf: SparkConf,
fraction: Double,
credentials: Credentials): Long = {
val now = System.currentTimeMillis()

val renewalInterval =
sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)

credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.map { t =>
val identifier = new DelegationTokenIdentifier()
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
}.foldLeft(0L)(math.max)
}


private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
val fileName = credentialsPath.getName
fileName.substring(
fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
}


private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored

/**
Expand Down Expand Up @@ -231,6 +293,17 @@ class SparkHadoopUtil extends Logging {
}
}
}

/**
* Start a thread to periodically update the current user's credentials with new delegation
* tokens so that writes to HDFS do not fail.
*/
private[spark] def startExecutorDelegationTokenRenewer(conf: SparkConf) {}

/**
* Stop the thread that does the delegation token updates.
*/
private[spark] def stopExecutorDelegationTokenRenewer() {}
}

object SparkHadoopUtil {
Expand All @@ -251,6 +324,10 @@ object SparkHadoopUtil {
}
}

val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"

val SPARK_YARN_CREDS_COUNTER_DELIM = "-"

def get: SparkHadoopUtil = {
hadoop
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),

// Yarn client or cluster
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),

// Other options
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var action: SparkSubmitAction = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
var proxyUser: String = null
var principal: String = null
var keytab: String = null

// Standalone cluster mode only
var supervise: Boolean = false
Expand Down Expand Up @@ -393,6 +395,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case PROXY_USER =>
proxyUser = value

case PRINCIPAL =>
principal = value

case KEYTAB =>
keytab = value

case HELP =>
printUsageAndExit(0)

Expand Down Expand Up @@ -506,6 +514,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.
| --principal PRINCIPAL Principal to be used to login to KDC, while running on
| secure HDFS.
| --keytab KEYTAB The full path to the file that contains the keytab for the
| principal specified above. This keytab will be copied to
| the node running the Application Master via the Secure
| Distributed Cache, for renewing the login tickets and the
| delegation tokens periodically.
""".stripMargin
)
SparkSubmit.exitFn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.executor
import java.net.URL
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration

import scala.collection.mutable
import scala.util.{Failure, Success}

Expand Down Expand Up @@ -168,6 +170,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf.set(key, value)
}
}
if (driverConf.contains("spark.yarn.credentials.file")) {
logInfo("Will periodically update credentials from: " +
driverConf.get("spark.yarn.credentials.file"))
SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
}

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)

Expand All @@ -183,6 +191,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

override protected def log = CoarseGrainedSchedulerBackend.this.log

private val addressToExecutorId = new HashMap[RpcAddress, String]
Expand Down Expand Up @@ -112,6 +113,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}

}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand All @@ -122,7 +124,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
} else {
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
context.reply(RegisteredExecutor)

addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
Expand Down Expand Up @@ -243,6 +244,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
properties += ((key, value))
}
}

// TODO (prashant) send conf instead of properties
driverEndpoint = rpcEnv.setupEndpoint(
CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))
Expand Down
2 changes: 2 additions & 0 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ SSL must be configured on each node and configured for each component involved i
### YARN mode
The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.

For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS.

### Standalone mode
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ class SparkSubmitOptionParser {
// YARN-only options.
protected final String ARCHIVES = "--archives";
protected final String EXECUTOR_CORES = "--executor-cores";
protected final String QUEUE = "--queue";
protected final String KEYTAB = "--keytab";
protected final String NUM_EXECUTORS = "--num-executors";
protected final String PRINCIPAL = "--principal";
protected final String QUEUE = "--queue";

/**
* This is the canonical list of spark-submit options. Each entry in the array contains the
Expand All @@ -96,11 +98,13 @@ class SparkSubmitOptionParser {
{ EXECUTOR_MEMORY },
{ FILES },
{ JARS },
{ KEYTAB },
{ KILL_SUBMISSION },
{ MASTER },
{ NAME },
{ NUM_EXECUTORS },
{ PACKAGES },
{ PRINCIPAL },
{ PROPERTIES_FILE },
{ PROXY_USER },
{ PY_FILES },
Expand Down
Loading

0 comments on commit b1f4ca8

Please sign in to comment.