aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorThomas Graves <tgraves@staydecay.corp.gq1.yahoo.com>2015-10-06 10:18:50 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-10-06 10:18:50 -0700
commite9783601599758df87418bf61a7b4636f06714fa (patch)
treee2ef37084947d32f3596b78c8919b28633070de6 /yarn
parent744f03e700b0e3a7c2a92e92edc79d2374c19023 (diff)
downloadspark-e9783601599758df87418bf61a7b4636f06714fa.tar.gz
spark-e9783601599758df87418bf61a7b4636f06714fa.tar.bz2
spark-e9783601599758df87418bf61a7b4636f06714fa.zip
[SPARK-10901] [YARN] spark.yarn.user.classpath.first doesn't work
This should go into 1.5.2 also. The issue is we were no longer adding the __app__.jar to the system classpath. Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com> Author: Tom Graves <tgraves@yahoo-inc.com> Closes #8959 from tgravescs/SPARK-10901.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala39
1 files changed, 27 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f8748ef658..eb3b7fb885 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1155,13 +1155,24 @@ object Client extends Logging {
}
if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
- val userClassPath =
+ // in order to properly add the app jar when user classpath is first
+ // we have to do the mainJar separate in order to send the right thing
+ // into addFileToClasspath
+ val mainJar =
if (args != null) {
- getUserClasspath(Option(args.userJar), Option(args.addJars))
+ getMainJarUri(Option(args.userJar))
} else {
- getUserClasspath(sparkConf)
+ getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR))
}
- userClassPath.foreach { x =>
+ mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env))
+
+ val secondaryJars =
+ if (args != null) {
+ getSecondaryJarUris(Option(args.addJars))
+ } else {
+ getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+ }
+ secondaryJars.foreach { x =>
addFileToClasspath(sparkConf, x, null, env)
}
}
@@ -1178,16 +1189,20 @@ object Client extends Logging {
* @param conf Spark configuration.
*/
def getUserClasspath(conf: SparkConf): Array[URI] = {
- getUserClasspath(conf.getOption(CONF_SPARK_USER_JAR),
- conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+ val mainUri = getMainJarUri(conf.getOption(CONF_SPARK_USER_JAR))
+ val secondaryUris = getSecondaryJarUris(conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+ (mainUri ++ secondaryUris).toArray
}
- private def getUserClasspath(
- mainJar: Option[String],
- secondaryJars: Option[String]): Array[URI] = {
- val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_))
- val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_))
- (mainUri ++ secondaryUris).toArray
+ private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
+ mainJar.flatMap { path =>
+ val uri = new URI(path)
+ if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None
+ }.orElse(Some(new URI(APP_JAR)))
+ }
+
+ private def getSecondaryJarUris(secondaryJars: Option[String]): Seq[URI] = {
+ secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_))
}
/**