diff options
author | Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com> | 2015-10-06 10:18:50 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2015-10-06 10:18:50 -0700 |
commit | e9783601599758df87418bf61a7b4636f06714fa (patch) | |
tree | e2ef37084947d32f3596b78c8919b28633070de6 /yarn/src | |
parent | 744f03e700b0e3a7c2a92e92edc79d2374c19023 (diff) | |
download | spark-e9783601599758df87418bf61a7b4636f06714fa.tar.gz spark-e9783601599758df87418bf61a7b4636f06714fa.tar.bz2 spark-e9783601599758df87418bf61a7b4636f06714fa.zip |
[SPARK-10901] [YARN] spark.yarn.user.classpath.first doesn't work
This should go into 1.5.2 also.
The issue is we were no longer adding the __app__.jar to the system classpath.
Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com>
Author: Tom Graves <tgraves@yahoo-inc.com>
Closes #8959 from tgravescs/SPARK-10901.
Diffstat (limited to 'yarn/src')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 39 |
1 files changed, 27 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f8748ef658..eb3b7fb885 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1155,13 +1155,24 @@ object Client extends Logging { } if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) { - val userClassPath = + // in order to properly add the app jar when user classpath is first + // we have to do the mainJar separate in order to send the right thing + // into addFileToClasspath + val mainJar = if (args != null) { - getUserClasspath(Option(args.userJar), Option(args.addJars)) + getMainJarUri(Option(args.userJar)) } else { - getUserClasspath(sparkConf) + getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR)) } - userClassPath.foreach { x => + mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env)) + + val secondaryJars = + if (args != null) { + getSecondaryJarUris(Option(args.addJars)) + } else { + getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS)) + } + secondaryJars.foreach { x => addFileToClasspath(sparkConf, x, null, env) } } @@ -1178,16 +1189,20 @@ object Client extends Logging { * @param conf Spark configuration. */ def getUserClasspath(conf: SparkConf): Array[URI] = { - getUserClasspath(conf.getOption(CONF_SPARK_USER_JAR), - conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS)) + val mainUri = getMainJarUri(conf.getOption(CONF_SPARK_USER_JAR)) + val secondaryUris = getSecondaryJarUris(conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS)) + (mainUri ++ secondaryUris).toArray } - private def getUserClasspath( - mainJar: Option[String], - secondaryJars: Option[String]): Array[URI] = { - val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_)) - val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_)) - (mainUri ++ secondaryUris).toArray + private def getMainJarUri(mainJar: Option[String]): Option[URI] = { + mainJar.flatMap { path => + val uri = new URI(path) + if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None + }.orElse(Some(new URI(APP_JAR))) + } + + private def getSecondaryJarUris(secondaryJars: Option[String]): Seq[URI] = { + secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_)) } /** |