diff options
Diffstat (limited to 'yarn/src/main/scala')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 32 | ||||
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 11 |
2 files changed, 30 insertions, 13 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b74ea9a10a..bc28ce5eea 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -80,10 +80,12 @@ private[spark] class Client( private val isClusterMode = args.isClusterMode private var loginFromKeytab = false + private var principal: String = null + private var keytab: String = null + private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) - def stop(): Unit = yarnClient.stop() /** @@ -339,7 +341,7 @@ private[spark] class Client( if (loginFromKeytab) { logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + " via the YARN Secure Distributed Cache.") - val (_, localizedPath) = distribute(args.keytab, + val (_, localizedPath) = distribute(keytab, destName = Some(sparkConf.get("spark.yarn.keytab")), appMasterOnly = true) require(localizedPath != null, "Keytab file already distributed.") @@ -785,19 +787,27 @@ private[spark] class Client( } def setupCredentials(): Unit = { - if (args.principal != null) { - require(args.keytab != null, "Keytab must be specified when principal is specified.") + loginFromKeytab = args.principal != null || sparkConf.contains("spark.yarn.principal") + if (loginFromKeytab) { + principal = + if (args.principal != null) args.principal else sparkConf.get("spark.yarn.principal") + keytab = { + if (args.keytab != null) { + args.keytab + } else { + sparkConf.getOption("spark.yarn.keytab").orNull + } + } + + require(keytab != null, "Keytab must be specified when principal is specified.") logInfo("Attempting to login to the Kerberos" + - s" using principal: ${args.principal} and keytab: ${args.keytab}") - val f = new File(args.keytab) + s" using principal: $principal and keytab: $keytab") + val f = new File(keytab) // Generate a file name that can be used for the keytab file, that does not conflict // with any user file. val keytabFileName = f.getName + "-" + UUID.randomUUID().toString - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) - loginFromKeytab = true sparkConf.set("spark.yarn.keytab", keytabFileName) - sparkConf.set("spark.yarn.principal", args.principal) - logInfo("Successfully logged into the KDC.") + sparkConf.set("spark.yarn.principal", principal) } credentials = UserGroupInformation.getCurrentUser.getCredentials } @@ -1162,7 +1172,7 @@ object Client extends Logging { * * If not a "local:" file and no alternate name, the environment is not modified. * - * @parma conf Spark configuration. + * @param conf Spark configuration. * @param uri URI to add to classpath (optional). * @param fileName Alternate name for the file (optional). * @param env Map holding the environment variables. diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 3a0b9443d2..d97fa2e215 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -20,10 +20,9 @@ package org.apache.spark.scheduler.cluster import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} -import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.spark.{SparkException, Logging, SparkContext} -import org.apache.spark.deploy.yarn.{Client, ClientArguments} +import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class YarnClientSchedulerBackend( @@ -62,6 +61,13 @@ private[spark] class YarnClientSchedulerBackend( super.start() waitForApplication() + + // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver + // reads the credentials from HDFS, just like the executors and updates its own credentials + // cache. + if (conf.contains("spark.yarn.credentials.file")) { + YarnSparkHadoopUtil.get.startExecutorDelegationTokenRenewer(conf) + } monitorThread = asyncMonitorApplication() monitorThread.start() } @@ -158,6 +164,7 @@ private[spark] class YarnClientSchedulerBackend( } super.stop() client.stop() + YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer() logInfo("Stopped") } |