aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-05-22 01:52:50 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-05-22 01:52:50 -0700
commitdba314029b4c9d72d7e48a2093b39edd01931f57 (patch)
tree0f0efab82d551d1ed8901718a05c3b9d72db2c3b /yarn
parent2a948e7e1a345ae4e3d89ea24123410819d799d1 (diff)
downloadspark-dba314029b4c9d72d7e48a2093b39edd01931f57.tar.gz
spark-dba314029b4c9d72d7e48a2093b39edd01931f57.tar.bz2
spark-dba314029b4c9d72d7e48a2093b39edd01931f57.zip
[SPARK-1870] Make spark-submit --jars work in yarn-cluster mode.
Sent secondary jars to distributed cache of all containers and add the cached jars to classpath before executors start. Tested on a YARN cluster (CDH-5.0). `spark-submit --jars` also works in standalone server and `yarn-client`. Thanks for @andrewor14 for testing! I removed "Doesn't work for drivers in standalone mode with "cluster" deploy mode." from `spark-submit`'s help message, though we haven't tested mesos yet. CC: @dbtsai @sryza Author: Xiangrui Meng <meng@databricks.com> Closes #848 from mengxr/yarn-classpath and squashes the following commits: 23e7df4 [Xiangrui Meng] rename spark.jar to __spark__.jar and app.jar to __app__.jar to avoid confliction apped $CWD/ and $CWD/* to the classpath remove unused methods a40f6ed [Xiangrui Meng] standalone -> cluster 65e04ad [Xiangrui Meng] update spark-submit help message and add a comment for yarn-client 11e5354 [Xiangrui Meng] minor changes 3e7e1c4 [Xiangrui Meng] use sparkConf instead of hadoop conf dc3c825 [Xiangrui Meng] add secondary jars to classpath in yarn
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala69
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala2
2 files changed, 18 insertions, 53 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 27a518ccda..aeb3f0062d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -44,7 +44,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
* Client submits an application to the YARN ResourceManager.
*
* Depending on the deployment mode this will launch one of two application master classes:
- * 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
+ * 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
* which launches a driver program inside of the cluster.
* 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
* request executors on behalf of a driver running outside of the cluster.
@@ -220,10 +220,11 @@ trait ClientBase extends Logging {
}
}
+ var cachedSecondaryJarLinks = ListBuffer.empty[String]
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
(args.archives, LocalResourceType.ARCHIVE, false) )
- fileLists.foreach { case (flist, resType, appMasterOnly) =>
+ fileLists.foreach { case (flist, resType, addToClasspath) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
@@ -232,11 +233,15 @@ trait ClientBase extends Logging {
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
- linkname, statCache, appMasterOnly)
+ linkname, statCache)
+ if (addToClasspath) {
+ cachedSecondaryJarLinks += linkname
+ }
}
}
}
}
+ sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
UserGroupInformation.getCurrentUser().addCredentials(credentials)
localResources
@@ -374,11 +379,12 @@ trait ClientBase extends Logging {
}
object ClientBase {
- val SPARK_JAR: String = "spark.jar"
- val APP_JAR: String = "app.jar"
+ val SPARK_JAR: String = "__spark__.jar"
+ val APP_JAR: String = "__app__.jar"
val LOG4J_PROP: String = "log4j.properties"
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
val LOCAL_SCHEME = "local"
+ val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
@@ -479,67 +485,26 @@ object ClientBase {
extraClassPath.foreach(addClasspathEntry)
- addClasspathEntry(Environment.PWD.$())
+ val cachedSecondaryJarLinks =
+ sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
// Normally the users app.jar is last in case conflicts with spark jars
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
addPwdClasspathEntry(APP_JAR)
+ cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
} else {
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
addPwdClasspathEntry(APP_JAR)
+ cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
}
+ // Append all class files and jar files under the working directory to the classpath.
+ addClasspathEntry(Environment.PWD.$())
addPwdClasspathEntry("*")
}
/**
- * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
- * to the classpath.
- */
- private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
- if (args != null) {
- addClasspathEntry(args.userJar, APP_JAR, env)
- }
-
- if (args != null && args.addJars != null) {
- args.addJars.split(",").foreach { case file: String =>
- addClasspathEntry(file, null, env)
- }
- }
- }
-
- /**
- * Adds the given path to the classpath, handling "local:" URIs correctly.
- *
- * If an alternate name for the file is given, and it's not a "local:" file, the alternate
- * name will be added to the classpath (relative to the job's work directory).
- *
- * If not a "local:" file and no alternate name, the environment is not modified.
- *
- * @param path Path to add to classpath (optional).
- * @param fileName Alternate name for the file (optional).
- * @param env Map holding the environment variables.
- */
- private def addClasspathEntry(path: String, fileName: String,
- env: HashMap[String, String]) : Unit = {
- if (path != null) {
- scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
- val localPath = getLocalPath(path)
- if (localPath != null) {
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
- File.pathSeparator)
- return
- }
- }
- }
- if (fileName != null) {
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
- Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
- }
- }
-
- /**
* Returns the local path if the URI is a "local:" URI, or null otherwise.
*/
private def getLocalPath(resource: String): String = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 0ac162538f..e01ed5a57d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend(
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
"--class", "notused",
- "--jar", null,
+ "--jar", null, // The primary jar will be added dynamically in SparkContext.
"--args", hostport,
"--am-class", classOf[ExecutorLauncher].getName
)