aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-07-17 09:38:08 -0500
committerTom Graves <tgraves@yahoo-inc.com>2015-07-17 09:38:08 -0500
commitc043a3e9df55721f21332f7c44ff351832d20324 (patch)
tree565bb5f19c0382bb9696590e0f969310a72d3572 /core
parentec8973d1245d4a99edeb7365d7f4b0063ac31ddf (diff)
downloadspark-c043a3e9df55721f21332f7c44ff351832d20324.tar.gz
spark-c043a3e9df55721f21332f7c44ff351832d20324.tar.bz2
spark-c043a3e9df55721f21332f7c44ff351832d20324.zip
[SPARK-8851] [YARN] In Client mode, make sure the client logs in and updates tokens
In client side, the flow is SparkSubmit -> SparkContext -> yarn/Client. Since the yarn client only gets a cloned config and the staging dir is set here, it is not really possible to do re-logins in the SparkContext. So, do the initial logins in Spark Submit and do re-logins as we do now in the AM, but the Client behaves like an executor in this specific context and reads the credentials file to update the tokens. This way, even if the streaming context is started up from checkpoint - it is fine since we have logged in from SparkSubmit itself itself. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #7394 from harishreedharan/yarn-client-login and squashes the following commits: 9a2166f [Hari Shreedharan] make it possible to use command line args and config parameters together. de08f57 [Hari Shreedharan] Fix import order. 5c4fa63 [Hari Shreedharan] Add a comment explaining what is being done in YarnClientSchedulerBackend. c872caa [Hari Shreedharan] Fix typo in log message. 2c80540 [Hari Shreedharan] Move token renewal to YarnClientSchedulerBackend. 0c48ac2 [Hari Shreedharan] Remove direct use of ExecutorDelegationTokenUpdater in Client. 26f8bfa [Hari Shreedharan] [SPARK-8851][YARN] In Client mode, make sure the client logs in and updates tokens. 58b1969 [Hari Shreedharan] Simple attempt 1.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala10
2 files changed, 26 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 9f94118829..6b14d407a6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -25,6 +25,7 @@ import java.util.{Arrays, Comparator}
import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps
+import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
@@ -248,19 +249,25 @@ class SparkHadoopUtil extends Logging {
dir: Path,
prefix: String,
exclusionSuffix: String): Array[FileStatus] = {
- val fileStatuses = remoteFs.listStatus(dir,
- new PathFilter {
- override def accept(path: Path): Boolean = {
- val name = path.getName
- name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
+ try {
+ val fileStatuses = remoteFs.listStatus(dir,
+ new PathFilter {
+ override def accept(path: Path): Boolean = {
+ val name = path.getName
+ name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
+ }
+ })
+ Arrays.sort(fileStatuses, new Comparator[FileStatus] {
+ override def compare(o1: FileStatus, o2: FileStatus): Int = {
+ Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
- Arrays.sort(fileStatuses, new Comparator[FileStatus] {
- override def compare(o1: FileStatus, o2: FileStatus): Int = {
- Longs.compare(o1.getModificationTime, o2.getModificationTime)
- }
- })
- fileStatuses
+ fileStatuses
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Error while attempting to list files from application staging dir", e)
+ Array.empty
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 036cb6e054..0b39ee8fe3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -508,8 +508,14 @@ object SparkSubmit {
}
// Let YARN know it's a pyspark app, so it distributes needed libraries.
- if (clusterManager == YARN && args.isPython) {
- sysProps.put("spark.yarn.isPython", "true")
+ if (clusterManager == YARN) {
+ if (args.isPython) {
+ sysProps.put("spark.yarn.isPython", "true")
+ }
+ if (args.principal != null) {
+ require(args.keytab != null, "Keytab must be specified when the keytab is specified")
+ UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+ }
}
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class