aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorwitgo <witgo@qq.com>2014-06-19 12:11:26 -0500
committerThomas Graves <tgraves@apache.org>2014-06-19 12:11:26 -0500
commitbce0897bc6b0fc9bca5444dbe3a9e75523ad7481 (patch)
treec0b87857704b2bb014c2bd3a3409d6749b9ee8bb /yarn
parent67fca189c944b8f8ba222bb471e343893031bd7b (diff)
downloadspark-bce0897bc6b0fc9bca5444dbe3a9e75523ad7481.tar.gz
spark-bce0897bc6b0fc9bca5444dbe3a9e75523ad7481.tar.bz2
spark-bce0897bc6b0fc9bca5444dbe3a9e75523ad7481.zip
[SPARK-2051]In yarn.ClientBase spark.yarn.dist.* do not work
Author: witgo <witgo@qq.com> Closes #969 from witgo/yarn_ClientBase and squashes the following commits: 8117765 [witgo] review commit 3bdbc52 [witgo] Merge branch 'master' of https://github.com/apache/spark into yarn_ClientBase 5261b6c [witgo] fix sys.props.get("SPARK_YARN_DIST_FILES") e3c1107 [witgo] update docs b6a9aa1 [witgo] merge master c8b4554 [witgo] review commit 2f48789 [witgo] Merge branch 'master' of https://github.com/apache/spark into yarn_ClientBase 8d7b82f [witgo] Merge branch 'master' of https://github.com/apache/spark into yarn_ClientBase 1048549 [witgo] remove Utils.resolveURIs 871f1db [witgo] add spark.yarn.dist.* documentation 41bce59 [witgo] review commit 35d6fa0 [witgo] move to ClientArguments 55d72fc [witgo] Merge branch 'master' of https://github.com/apache/spark into yarn_ClientBase 9cdff16 [witgo] review commit 8bc2f4b [witgo] review commit 20e667c [witgo] Merge branch 'master' into yarn_ClientBase 0961151 [witgo] merge master ce609fc [witgo] Merge branch 'master' into yarn_ClientBase 8362489 [witgo] yarn.ClientBase spark.yarn.dist.* do not work
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala15
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala3
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
3 files changed, 16 insertions, 6 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index fd3ef9e1fa..62f9b3cf5a 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.InputFormatInfo
-import org.apache.spark.util.IntParam
-import org.apache.spark.util.MemoryParam
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
@@ -45,6 +44,18 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
parseArgs(args.toList)
+ // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then
+ // it should default to hdfs://
+ files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)
+ archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull)
+
+ // spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified,
+ // for both yarn-client and yarn-cluster
+ files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").
+ map(p => Utils.resolveURIs(p)).orNull)
+ archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").
+ map(p => Utils.resolveURIs(p)).orNull)
+
private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 858bcaa95b..8f22675999 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -162,7 +162,7 @@ trait ClientBase extends Logging {
val fs = FileSystem.get(conf)
val remoteFs = originalPath.getFileSystem(conf)
var newPath = originalPath
- if (! compareFs(remoteFs, fs)) {
+ if (!compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
@@ -250,6 +250,7 @@ trait ClientBase extends Logging {
}
}
}
+ logInfo("Prepared Local resources " + localResources)
sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
UserGroupInformation.getCurrentUser().addCredentials(credentials)
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 039cf4f276..412dfe38d5 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -70,9 +70,7 @@ private[spark] class YarnClientSchedulerBackend(
("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
- ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"),
- ("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"),
- ("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives"))
+ ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"))
.foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
logDebug("ClientArguments called with: " + argsArrayBuf)