aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortgravescs <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-10-03 11:52:16 -0500
committertgravescs <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-10-03 11:52:16 -0500
commit0fff4ee8523ff4137eedfc314b51135427137c63 (patch)
tree9f96cb33c2d5c3dc54b34a4716052957a1841633
parent9d4246863a25f7c91f324e004fe000b9848f6057 (diff)
downloadspark-0fff4ee8523ff4137eedfc314b51135427137c63.tar.gz
spark-0fff4ee8523ff4137eedfc314b51135427137c63.tar.bz2
spark-0fff4ee8523ff4137eedfc314b51135427137c63.zip
Adding in the --addJars option to make SparkContext.addJar work on yarn and cleanup
the classpaths
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala17
-rw-r--r--docs/running-on-yarn.md2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala27
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala17
5 files changed, 50 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 72540c712a..8ed5dcbf5d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -643,10 +643,21 @@ class SparkContext(
key = uri.getScheme match {
case null | "file" =>
if (env.hadoop.isYarnMode()) {
- logWarning("local jar specified as parameter to addJar under Yarn mode")
- return
+ // In order for this to work on yarn the user must specify the --addjars option to
+ // the client to upload the file into the distributed cache to make it show up in the
+ // current working directory.
+ val fileName = new Path(uri.getPath).getName()
+ try {
+ env.httpFileServer.addJar(new File(fileName))
+ } catch {
+ case e: Exception => {
+ logError("Error adding jar (" + e + "), was the --addJars option used?")
+ throw e
+ }
+ }
+ } else {
+ env.httpFileServer.addJar(new File(uri.getPath))
}
- env.httpFileServer.addJar(new File(uri.getPath))
case _ =>
path
}
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index beaae69aa2..a807ec603d 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -53,6 +53,7 @@ The command to launch the YARN Client is as follows:
--worker-memory <MEMORY_PER_WORKER> \
--worker-cores <CORES_PER_WORKER> \
--queue <queue_name> \
+ --addJars <any_local_files_used_in_SparkContext.addJar> \
--files <files_for_distributed_cache> \
--archives <archives_for_distributed_cache>
@@ -88,3 +89,4 @@ The above starts a YARN Client programs which periodically polls the Application
- We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed.
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
+- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b4d243ed7a..fb1b339f27 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -127,7 +127,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
originalPath: Path,
replication: Short,
localResources: HashMap[String,LocalResource],
- fragment: String) = {
+ fragment: String,
+ appMasterOnly: Boolean = false): Unit = {
val fs = FileSystem.get(conf)
val newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
@@ -149,6 +150,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
pathURI = new URI(newPath.toString() + "#" + fragment);
}
val distPath = pathURI.toString()
+ if (appMasterOnly == true) return
if (resourceType == LocalResourceType.FILE) {
distFiles match {
case Some(path) =>
@@ -223,6 +225,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
+ // handle any add jars
+ if ((args.addJars != null) && (!args.addJars.isEmpty())){
+ args.addJars.split(',').foreach { case file: String =>
+ val tmpURI = new URI(file)
+ val tmp = new Path(tmpURI)
+ copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
+ tmpURI.getFragment(), true)
+ }
+ }
+
// handle any distributed cache files
if ((args.files != null) && (!args.files.isEmpty())){
args.files.split(',').foreach { case file: String =>
@@ -253,11 +265,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val env = new HashMap[String, String]()
- // If log4j present, ensure ours overrides all others
- if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name,
+ Environment.PWD.$() + Path.SEPARATOR + "*")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
Client.populateHadoopClasspath(yarnConf, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
@@ -279,6 +290,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
}
+ // set the environment variables to be passed on to the Workers
if (distFiles != None) {
env("SPARK_YARN_CACHE_FILES") = distFiles.get
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get
@@ -328,8 +340,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Add Xmx for am memory
JAVA_OPTS += "-Xmx" + amMemory + "m "
- JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+ JAVA_OPTS += " -Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
@@ -345,6 +357,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
}
+
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 30d9b6e60f..0833153541 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -24,6 +24,7 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
class ClientArguments(val args: Array[String]) {
+ var addJars: String = null
var files: String = null
var archives: String = null
var userJar: String = null
@@ -80,6 +81,10 @@ class ClientArguments(val args: Array[String]) {
amQueue = value
args = tail
+ case ("--addJars") :: value :: tail =>
+ addJars = value
+ args = tail
+
case ("--files") :: value :: tail =>
files = value
args = tail
@@ -119,8 +124,9 @@ class ClientArguments(val args: Array[String]) {
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
- " --files file Comma separated list of files to be distributed with the job.\n" +
- " --archives archive Comma separated list of archives to be distributed with the job."
+ " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
+ " --files files Comma separated list of files to be distributed with the job.\n" +
+ " --archives archives Comma separated list of archives to be distributed with the job."
)
System.exit(exitCode)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index d340b114df..8dac9e02ac 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -77,8 +77,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
- JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+ JAVA_OPTS += " -Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
@@ -215,15 +216,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- // If log4j present, ensure ours overrides all others
- if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
- // Which is correct ?
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
- }
-
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name,
+ Environment.PWD.$() + Path.SEPARATOR + "*")
Client.populateHadoopClasspath(yarnConf, env)
// allow users to specify some environment variables