aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-07-17 09:38:08 -0500
committerTom Graves <tgraves@yahoo-inc.com>2015-07-17 09:38:08 -0500
commitc043a3e9df55721f21332f7c44ff351832d20324 (patch)
tree565bb5f19c0382bb9696590e0f969310a72d3572 /yarn
parentec8973d1245d4a99edeb7365d7f4b0063ac31ddf (diff)
downloadspark-c043a3e9df55721f21332f7c44ff351832d20324.tar.gz
spark-c043a3e9df55721f21332f7c44ff351832d20324.tar.bz2
spark-c043a3e9df55721f21332f7c44ff351832d20324.zip
[SPARK-8851] [YARN] In Client mode, make sure the client logs in and updates tokens
In client side, the flow is SparkSubmit -> SparkContext -> yarn/Client. Since the yarn client only gets a cloned config and the staging dir is set here, it is not really possible to do re-logins in the SparkContext. So, do the initial logins in Spark Submit and do re-logins as we do now in the AM, but the Client behaves like an executor in this specific context and reads the credentials file to update the tokens. This way, even if the streaming context is started up from checkpoint - it is fine since we have logged in from SparkSubmit itself itself. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #7394 from harishreedharan/yarn-client-login and squashes the following commits: 9a2166f [Hari Shreedharan] make it possible to use command line args and config parameters together. de08f57 [Hari Shreedharan] Fix import order. 5c4fa63 [Hari Shreedharan] Add a comment explaining what is being done in YarnClientSchedulerBackend. c872caa [Hari Shreedharan] Fix typo in log message. 2c80540 [Hari Shreedharan] Move token renewal to YarnClientSchedulerBackend. 0c48ac2 [Hari Shreedharan] Remove direct use of ExecutorDelegationTokenUpdater in Client. 26f8bfa [Hari Shreedharan] [SPARK-8851][YARN] In Client mode, make sure the client logs in and updates tokens. 58b1969 [Hari Shreedharan] Simple attempt 1.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala32
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala11
2 files changed, 30 insertions, 13 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b74ea9a10a..bc28ce5eea 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -80,10 +80,12 @@ private[spark] class Client(
private val isClusterMode = args.isClusterMode
private var loginFromKeytab = false
+ private var principal: String = null
+ private var keytab: String = null
+
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
-
def stop(): Unit = yarnClient.stop()
/**
@@ -339,7 +341,7 @@ private[spark] class Client(
if (loginFromKeytab) {
logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
" via the YARN Secure Distributed Cache.")
- val (_, localizedPath) = distribute(args.keytab,
+ val (_, localizedPath) = distribute(keytab,
destName = Some(sparkConf.get("spark.yarn.keytab")),
appMasterOnly = true)
require(localizedPath != null, "Keytab file already distributed.")
@@ -785,19 +787,27 @@ private[spark] class Client(
}
def setupCredentials(): Unit = {
- if (args.principal != null) {
- require(args.keytab != null, "Keytab must be specified when principal is specified.")
+ loginFromKeytab = args.principal != null || sparkConf.contains("spark.yarn.principal")
+ if (loginFromKeytab) {
+ principal =
+ if (args.principal != null) args.principal else sparkConf.get("spark.yarn.principal")
+ keytab = {
+ if (args.keytab != null) {
+ args.keytab
+ } else {
+ sparkConf.getOption("spark.yarn.keytab").orNull
+ }
+ }
+
+ require(keytab != null, "Keytab must be specified when principal is specified.")
logInfo("Attempting to login to the Kerberos" +
- s" using principal: ${args.principal} and keytab: ${args.keytab}")
- val f = new File(args.keytab)
+ s" using principal: $principal and keytab: $keytab")
+ val f = new File(keytab)
// Generate a file name that can be used for the keytab file, that does not conflict
// with any user file.
val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
- UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
- loginFromKeytab = true
sparkConf.set("spark.yarn.keytab", keytabFileName)
- sparkConf.set("spark.yarn.principal", args.principal)
- logInfo("Successfully logged into the KDC.")
+ sparkConf.set("spark.yarn.principal", principal)
}
credentials = UserGroupInformation.getCurrentUser.getCredentials
}
@@ -1162,7 +1172,7 @@ object Client extends Logging {
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
- * @parma conf Spark configuration.
+ * @param conf Spark configuration.
* @param uri URI to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 3a0b9443d2..d97fa2e215 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -20,10 +20,9 @@ package org.apache.spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
-import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class YarnClientSchedulerBackend(
@@ -62,6 +61,13 @@ private[spark] class YarnClientSchedulerBackend(
super.start()
waitForApplication()
+
+ // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
+ // reads the credentials from HDFS, just like the executors and updates its own credentials
+ // cache.
+ if (conf.contains("spark.yarn.credentials.file")) {
+ YarnSparkHadoopUtil.get.startExecutorDelegationTokenRenewer(conf)
+ }
monitorThread = asyncMonitorApplication()
monitorThread.start()
}
@@ -158,6 +164,7 @@ private[spark] class YarnClientSchedulerBackend(
}
super.stop()
client.stop()
+ YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
logInfo("Stopped")
}