aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala2
-rw-r--r--docs/running-on-yarn.md16
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala14
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala6
5 files changed, 36 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 92bb5059a0..d1b32ea077 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -428,6 +428,8 @@ object SparkSubmit {
OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"),
OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
+ OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
+ OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"),
// Yarn cluster only
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
@@ -440,10 +442,8 @@ object SparkSubmit {
OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
-
- // Yarn client or cluster
- OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
- OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),
+ OptionAssigner(args.principal, YARN, CLUSTER, clOption = "--principal"),
+ OptionAssigner(args.keytab, YARN, CLUSTER, clOption = "--keytab"),
// Other options
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index c0e4c77190..cc6a7bd9f4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -169,6 +169,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
numExecutors = Option(numExecutors)
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
+ keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
+ principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 9d55f435e8..96cf612c54 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -242,6 +242,22 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
running against earlier versions, this property will be ignored.
</td>
</tr>
+<tr>
+ <td><code>spark.yarn.keytab</code></td>
+ <td>(none)</td>
+ <td>
+ The full path to the file that contains the keytab for the principal specified above.
+ This keytab will be copied to the node running the Application Master via the Secure Distributed Cache,
+ for renewing the login tickets and the delegation tokens periodically.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.yarn.principal</code></td>
+ <td>(none)</td>
+ <td>
+ Principal to be used to login to KDC, while running on secure HDFS.
+ </td>
+</tr>
</table>
# Launching Spark on YARN
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
index aaae6f9734..77af46c192 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -60,8 +60,11 @@ private[yarn] class AMDelegationTokenRenewer(
private val hadoopUtil = YarnSparkHadoopUtil.get
- private val daysToKeepFiles = sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
- private val numFilesToKeep = sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
+ private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
+ private val daysToKeepFiles =
+ sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
+ private val numFilesToKeep =
+ sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
/**
* Schedule a login from the keytab and principal set using the --principal and --keytab
@@ -121,7 +124,7 @@ private[yarn] class AMDelegationTokenRenewer(
import scala.concurrent.duration._
try {
val remoteFs = FileSystem.get(hadoopConf)
- val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
+ val credentialsPath = new Path(credentialsFile)
val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis
hadoopUtil.listFilesSorted(
remoteFs, credentialsPath.getParent,
@@ -160,7 +163,7 @@ private[yarn] class AMDelegationTokenRenewer(
val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
val tempCreds = keytabLoggedInUGI.getCredentials
- val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
+ val credentialsPath = new Path(credentialsFile)
val dst = credentialsPath.getParent
keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
// Get a copy of the credentials
@@ -186,8 +189,7 @@ private[yarn] class AMDelegationTokenRenewer(
}
val nextSuffix = lastCredentialsFileSuffix + 1
val tokenPathStr =
- sparkConf.get("spark.yarn.credentials.file") +
- SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix
+ credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix
val tokenPath = new Path(tokenPathStr)
val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 5653c9f14d..9c7b1b3988 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -98,6 +98,12 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
numExecutors = initialNumExecutors
}
+ principal = Option(principal)
+ .orElse(sparkConf.getOption("spark.yarn.principal"))
+ .orNull
+ keytab = Option(keytab)
+ .orElse(sparkConf.getOption("spark.yarn.keytab"))
+ .orNull
}
/**