aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2015-10-13 08:29:47 -0500
committerTom Graves <tgraves@yahoo-inc.com>2015-10-13 08:29:47 -0500
commit626aab79c9b4d4ac9d65bf5fa45b81dd9cbc609c (patch)
tree9f6a7e127abf9d31d0a4c933aa45238226f8437a /yarn
parentc4da5345a0ef643a7518756caaa18ff3f3ea9acc (diff)
downloadspark-626aab79c9b4d4ac9d65bf5fa45b81dd9cbc609c.tar.gz
spark-626aab79c9b4d4ac9d65bf5fa45b81dd9cbc609c.tar.bz2
spark-626aab79c9b4d4ac9d65bf5fa45b81dd9cbc609c.zip
[SPARK-11026] [YARN] spark.yarn.user.classpath.first does work for 'spark-submit --jars hdfs://user/foo.jar'
when spark.yarn.user.classpath.first=true and using 'spark-submit --jars hdfs://user/foo.jar', it can not put foo.jar to system classpath. so we need to put yarn's linkNames of jars to the system classpath. vanzin tgravescs Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #9045 from lianhuiwang/spark-11026.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala23
1 files changed, 15 insertions, 8 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index d25d830fd4..9fcfe362a3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1212,7 +1212,7 @@ object Client extends Logging {
} else {
getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR))
}
- mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env))
+ mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env))
val secondaryJars =
if (args != null) {
@@ -1221,10 +1221,10 @@ object Client extends Logging {
getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
}
secondaryJars.foreach { x =>
- addFileToClasspath(sparkConf, x, null, env)
+ addFileToClasspath(sparkConf, conf, x, null, env)
}
}
- addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+ addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
populateHadoopClasspath(conf, env)
sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1259,15 +1259,17 @@ object Client extends Logging {
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
* name will be added to the classpath (relative to the job's work directory).
*
- * If not a "local:" file and no alternate name, the environment is not modified.
+ * If not a "local:" file and no alternate name, the linkName will be added to the classpath.
*
- * @param conf Spark configuration.
- * @param uri URI to add to classpath (optional).
- * @param fileName Alternate name for the file (optional).
- * @param env Map holding the environment variables.
+ * @param conf Spark configuration.
+ * @param hadoopConf Hadoop configuration.
+ * @param uri URI to add to classpath (optional).
+ * @param fileName Alternate name for the file (optional).
+ * @param env Map holding the environment variables.
*/
private def addFileToClasspath(
conf: SparkConf,
+ hadoopConf: Configuration,
uri: URI,
fileName: String,
env: HashMap[String, String]): Unit = {
@@ -1276,6 +1278,11 @@ object Client extends Logging {
} else if (fileName != null) {
addClasspathEntry(buildPath(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
+ } else if (uri != null) {
+ val localPath = getQualifiedLocalPath(uri, hadoopConf)
+ val linkName = Option(uri.getFragment()).getOrElse(localPath.getName())
+ addClasspathEntry(buildPath(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env)
}
}