diff options
author | Xiangrui Meng <meng@databricks.com> | 2014-05-22 01:52:50 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-05-22 01:52:50 -0700 |
commit | dba314029b4c9d72d7e48a2093b39edd01931f57 (patch) | |
tree | 0f0efab82d551d1ed8901718a05c3b9d72db2c3b /yarn | |
parent | 2a948e7e1a345ae4e3d89ea24123410819d799d1 (diff) | |
download | spark-dba314029b4c9d72d7e48a2093b39edd01931f57.tar.gz spark-dba314029b4c9d72d7e48a2093b39edd01931f57.tar.bz2 spark-dba314029b4c9d72d7e48a2093b39edd01931f57.zip |
[SPARK-1870] Make spark-submit --jars work in yarn-cluster mode.
Sent secondary jars to distributed cache of all containers and add the cached jars to classpath before executors start. Tested on a YARN cluster (CDH-5.0).
`spark-submit --jars` also works in standalone server and `yarn-client`. Thanks for @andrewor14 for testing!
I removed "Doesn't work for drivers in standalone mode with "cluster" deploy mode." from `spark-submit`'s help message, though we haven't tested mesos yet.
CC: @dbtsai @sryza
Author: Xiangrui Meng <meng@databricks.com>
Closes #848 from mengxr/yarn-classpath and squashes the following commits:
23e7df4 [Xiangrui Meng] rename spark.jar to __spark__.jar and app.jar to __app__.jar to avoid confliction apped $CWD/ and $CWD/* to the classpath remove unused methods
a40f6ed [Xiangrui Meng] standalone -> cluster
65e04ad [Xiangrui Meng] update spark-submit help message and add a comment for yarn-client
11e5354 [Xiangrui Meng] minor changes
3e7e1c4 [Xiangrui Meng] use sparkConf instead of hadoop conf
dc3c825 [Xiangrui Meng] add secondary jars to classpath in yarn
Diffstat (limited to 'yarn')
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 69 | ||||
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 2 |
2 files changed, 18 insertions, 53 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 27a518ccda..aeb3f0062d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -44,7 +44,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext} * Client submits an application to the YARN ResourceManager. * * Depending on the deployment mode this will launch one of two application master classes: - * 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]] + * 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]] * which launches a driver program inside of the cluster. * 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to * request executors on behalf of a driver running outside of the cluster. @@ -220,10 +220,11 @@ trait ClientBase extends Logging { } } + var cachedSecondaryJarLinks = ListBuffer.empty[String] val fileLists = List( (args.addJars, LocalResourceType.FILE, true), (args.files, LocalResourceType.FILE, false), (args.archives, LocalResourceType.ARCHIVE, false) ) - fileLists.foreach { case (flist, resType, appMasterOnly) => + fileLists.foreach { case (flist, resType, addToClasspath) => if (flist != null && !flist.isEmpty()) { flist.split(',').foreach { case file: String => val localURI = new URI(file.trim()) @@ -232,11 +233,15 @@ trait ClientBase extends Logging { val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyRemoteFile(dst, localPath, replication) distCacheMgr.addResource(fs, conf, destPath, localResources, resType, - linkname, statCache, appMasterOnly) + linkname, statCache) + if (addToClasspath) { + cachedSecondaryJarLinks += linkname + } } } } } + sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) UserGroupInformation.getCurrentUser().addCredentials(credentials) localResources @@ -374,11 +379,12 @@ trait ClientBase extends Logging { } object ClientBase { - val SPARK_JAR: String = "spark.jar" - val APP_JAR: String = "app.jar" + val SPARK_JAR: String = "__spark__.jar" + val APP_JAR: String = "__app__.jar" val LOG4J_PROP: String = "log4j.properties" val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF" val LOCAL_SCHEME = "local" + val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head) @@ -479,67 +485,26 @@ object ClientBase { extraClassPath.foreach(addClasspathEntry) - addClasspathEntry(Environment.PWD.$()) + val cachedSecondaryJarLinks = + sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") // Normally the users app.jar is last in case conflicts with spark jars if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { addPwdClasspathEntry(APP_JAR) + cachedSecondaryJarLinks.foreach(addPwdClasspathEntry) addPwdClasspathEntry(SPARK_JAR) ClientBase.populateHadoopClasspath(conf, env) } else { addPwdClasspathEntry(SPARK_JAR) ClientBase.populateHadoopClasspath(conf, env) addPwdClasspathEntry(APP_JAR) + cachedSecondaryJarLinks.foreach(addPwdClasspathEntry) } + // Append all class files and jar files under the working directory to the classpath. + addClasspathEntry(Environment.PWD.$()) addPwdClasspathEntry("*") } /** - * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly - * to the classpath. - */ - private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = { - if (args != null) { - addClasspathEntry(args.userJar, APP_JAR, env) - } - - if (args != null && args.addJars != null) { - args.addJars.split(",").foreach { case file: String => - addClasspathEntry(file, null, env) - } - } - } - - /** - * Adds the given path to the classpath, handling "local:" URIs correctly. - * - * If an alternate name for the file is given, and it's not a "local:" file, the alternate - * name will be added to the classpath (relative to the job's work directory). - * - * If not a "local:" file and no alternate name, the environment is not modified. - * - * @param path Path to add to classpath (optional). - * @param fileName Alternate name for the file (optional). - * @param env Map holding the environment variables. - */ - private def addClasspathEntry(path: String, fileName: String, - env: HashMap[String, String]) : Unit = { - if (path != null) { - scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { - val localPath = getLocalPath(path) - if (localPath != null) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath, - File.pathSeparator) - return - } - } - } - if (fileName != null) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, - Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator); - } - } - - /** * Returns the local path if the URI is a "local:" URI, or null otherwise. */ private def getLocalPath(resource: String): String = { diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 0ac162538f..e01ed5a57d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend( val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( "--class", "notused", - "--jar", null, + "--jar", null, // The primary jar will be added dynamically in SparkContext. "--args", hostport, "--am-class", classOf[ExecutorLauncher].getName ) |