aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala69
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala2
2 files changed, 18 insertions, 53 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 27a518ccda..aeb3f0062d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -44,7 +44,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
* Client submits an application to the YARN ResourceManager.
*
* Depending on the deployment mode this will launch one of two application master classes:
- * 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
+ * 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
* which launches a driver program inside of the cluster.
* 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
* request executors on behalf of a driver running outside of the cluster.
@@ -220,10 +220,11 @@ trait ClientBase extends Logging {
}
}
+ var cachedSecondaryJarLinks = ListBuffer.empty[String]
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
(args.archives, LocalResourceType.ARCHIVE, false) )
- fileLists.foreach { case (flist, resType, appMasterOnly) =>
+ fileLists.foreach { case (flist, resType, addToClasspath) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
@@ -232,11 +233,15 @@ trait ClientBase extends Logging {
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
- linkname, statCache, appMasterOnly)
+ linkname, statCache)
+ if (addToClasspath) {
+ cachedSecondaryJarLinks += linkname
+ }
}
}
}
}
+ sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
UserGroupInformation.getCurrentUser().addCredentials(credentials)
localResources
@@ -374,11 +379,12 @@ trait ClientBase extends Logging {
}
object ClientBase {
- val SPARK_JAR: String = "spark.jar"
- val APP_JAR: String = "app.jar"
+ val SPARK_JAR: String = "__spark__.jar"
+ val APP_JAR: String = "__app__.jar"
val LOG4J_PROP: String = "log4j.properties"
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
val LOCAL_SCHEME = "local"
+ val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
@@ -479,67 +485,26 @@ object ClientBase {
extraClassPath.foreach(addClasspathEntry)
- addClasspathEntry(Environment.PWD.$())
+ val cachedSecondaryJarLinks =
+ sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
// Normally the users app.jar is last in case conflicts with spark jars
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
addPwdClasspathEntry(APP_JAR)
+ cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
} else {
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
addPwdClasspathEntry(APP_JAR)
+ cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
}
+ // Append all class files and jar files under the working directory to the classpath.
+ addClasspathEntry(Environment.PWD.$())
addPwdClasspathEntry("*")
}
/**
- * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
- * to the classpath.
- */
- private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
- if (args != null) {
- addClasspathEntry(args.userJar, APP_JAR, env)
- }
-
- if (args != null && args.addJars != null) {
- args.addJars.split(",").foreach { case file: String =>
- addClasspathEntry(file, null, env)
- }
- }
- }
-
- /**
- * Adds the given path to the classpath, handling "local:" URIs correctly.
- *
- * If an alternate name for the file is given, and it's not a "local:" file, the alternate
- * name will be added to the classpath (relative to the job's work directory).
- *
- * If not a "local:" file and no alternate name, the environment is not modified.
- *
- * @param path Path to add to classpath (optional).
- * @param fileName Alternate name for the file (optional).
- * @param env Map holding the environment variables.
- */
- private def addClasspathEntry(path: String, fileName: String,
- env: HashMap[String, String]) : Unit = {
- if (path != null) {
- scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
- val localPath = getLocalPath(path)
- if (localPath != null) {
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
- File.pathSeparator)
- return
- }
- }
- }
- if (fileName != null) {
- YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
- Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
- }
- }
-
- /**
* Returns the local path if the URI is a "local:" URI, or null otherwise.
*/
private def getLocalPath(resource: String): String = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 0ac162538f..e01ed5a57d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend(
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
"--class", "notused",
- "--jar", null,
+ "--jar", null, // The primary jar will be added dynamically in SparkContext.
"--args", hostport,
"--am-class", classOf[ExecutorLauncher].getName
)