aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala32
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala11
4 files changed, 56 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 9f94118829..6b14d407a6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -25,6 +25,7 @@ import java.util.{Arrays, Comparator}
import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps
+import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
@@ -248,19 +249,25 @@ class SparkHadoopUtil extends Logging {
dir: Path,
prefix: String,
exclusionSuffix: String): Array[FileStatus] = {
- val fileStatuses = remoteFs.listStatus(dir,
- new PathFilter {
- override def accept(path: Path): Boolean = {
- val name = path.getName
- name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
+ try {
+ val fileStatuses = remoteFs.listStatus(dir,
+ new PathFilter {
+ override def accept(path: Path): Boolean = {
+ val name = path.getName
+ name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
+ }
+ })
+ Arrays.sort(fileStatuses, new Comparator[FileStatus] {
+ override def compare(o1: FileStatus, o2: FileStatus): Int = {
+ Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
- Arrays.sort(fileStatuses, new Comparator[FileStatus] {
- override def compare(o1: FileStatus, o2: FileStatus): Int = {
- Longs.compare(o1.getModificationTime, o2.getModificationTime)
- }
- })
- fileStatuses
+ fileStatuses
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Error while attempting to list files from application staging dir", e)
+ Array.empty
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 036cb6e054..0b39ee8fe3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -508,8 +508,14 @@ object SparkSubmit {
}
// Let YARN know it's a pyspark app, so it distributes needed libraries.
- if (clusterManager == YARN && args.isPython) {
- sysProps.put("spark.yarn.isPython", "true")
+ if (clusterManager == YARN) {
+ if (args.isPython) {
+ sysProps.put("spark.yarn.isPython", "true")
+ }
+ if (args.principal != null) {
+ require(args.keytab != null, "Keytab must be specified when the keytab is specified")
+ UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+ }
}
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b74ea9a10a..bc28ce5eea 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -80,10 +80,12 @@ private[spark] class Client(
private val isClusterMode = args.isClusterMode
private var loginFromKeytab = false
+ private var principal: String = null
+ private var keytab: String = null
+
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
-
def stop(): Unit = yarnClient.stop()
/**
@@ -339,7 +341,7 @@ private[spark] class Client(
if (loginFromKeytab) {
logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
" via the YARN Secure Distributed Cache.")
- val (_, localizedPath) = distribute(args.keytab,
+ val (_, localizedPath) = distribute(keytab,
destName = Some(sparkConf.get("spark.yarn.keytab")),
appMasterOnly = true)
require(localizedPath != null, "Keytab file already distributed.")
@@ -785,19 +787,27 @@ private[spark] class Client(
}
def setupCredentials(): Unit = {
- if (args.principal != null) {
- require(args.keytab != null, "Keytab must be specified when principal is specified.")
+ loginFromKeytab = args.principal != null || sparkConf.contains("spark.yarn.principal")
+ if (loginFromKeytab) {
+ principal =
+ if (args.principal != null) args.principal else sparkConf.get("spark.yarn.principal")
+ keytab = {
+ if (args.keytab != null) {
+ args.keytab
+ } else {
+ sparkConf.getOption("spark.yarn.keytab").orNull
+ }
+ }
+
+ require(keytab != null, "Keytab must be specified when principal is specified.")
logInfo("Attempting to login to the Kerberos" +
- s" using principal: ${args.principal} and keytab: ${args.keytab}")
- val f = new File(args.keytab)
+ s" using principal: $principal and keytab: $keytab")
+ val f = new File(keytab)
// Generate a file name that can be used for the keytab file, that does not conflict
// with any user file.
val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
- UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
- loginFromKeytab = true
sparkConf.set("spark.yarn.keytab", keytabFileName)
- sparkConf.set("spark.yarn.principal", args.principal)
- logInfo("Successfully logged into the KDC.")
+ sparkConf.set("spark.yarn.principal", principal)
}
credentials = UserGroupInformation.getCurrentUser.getCredentials
}
@@ -1162,7 +1172,7 @@ object Client extends Logging {
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
- * @parma conf Spark configuration.
+ * @param conf Spark configuration.
* @param uri URI to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 3a0b9443d2..d97fa2e215 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -20,10 +20,9 @@ package org.apache.spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
-import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class YarnClientSchedulerBackend(
@@ -62,6 +61,13 @@ private[spark] class YarnClientSchedulerBackend(
super.start()
waitForApplication()
+
+ // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
+ // reads the credentials from HDFS, just like the executors and updates its own credentials
+ // cache.
+ if (conf.contains("spark.yarn.credentials.file")) {
+ YarnSparkHadoopUtil.get.startExecutorDelegationTokenRenewer(conf)
+ }
monitorThread = asyncMonitorApplication()
monitorThread.start()
}
@@ -158,6 +164,7 @@ private[spark] class YarnClientSchedulerBackend(
}
super.stop()
client.stop()
+ YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
logInfo("Stopped")
}